powerpc/tm: Fix restoring FP/VMX facility incorrectly on interrupts
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
123
124 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
125                                  RBD_FEATURE_STRIPINGV2 |       \
126                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
127                                  RBD_FEATURE_OBJECT_MAP |       \
128                                  RBD_FEATURE_FAST_DIFF |        \
129                                  RBD_FEATURE_DEEP_FLATTEN |     \
130                                  RBD_FEATURE_DATA_POOL |        \
131                                  RBD_FEATURE_OPERATIONS)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190         const char      *pool_ns;       /* NULL if default, never "" */
191
192         const char      *image_id;
193         const char      *image_name;
194
195         u64             snap_id;
196         const char      *snap_name;
197
198         struct kref     kref;
199 };
200
201 /*
202  * an instance of the client.  multiple devices may share an rbd client.
203  */
204 struct rbd_client {
205         struct ceph_client      *client;
206         struct kref             kref;
207         struct list_head        node;
208 };
209
210 struct pending_result {
211         int                     result;         /* first nonzero result */
212         int                     num_pending;
213 };
214
215 struct rbd_img_request;
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA = 1,
219         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
220         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
221         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
222 };
223
224 enum obj_operation_type {
225         OBJ_OP_READ = 1,
226         OBJ_OP_WRITE,
227         OBJ_OP_DISCARD,
228         OBJ_OP_ZEROOUT,
229 };
230
231 #define RBD_OBJ_FLAG_DELETION                   (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
236
237 enum rbd_obj_read_state {
238         RBD_OBJ_READ_START = 1,
239         RBD_OBJ_READ_OBJECT,
240         RBD_OBJ_READ_PARENT,
241 };
242
243 /*
244  * Writes go through the following state machine to deal with
245  * layering:
246  *
247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248  *            .                 |                                    .
249  *            .                 v                                    .
250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251  *            .                 |                    .               .
252  *            .                 v                    v (deep-copyup  .
253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254  * flattened) v                 |                    .               .
255  *            .                 v                    .               .
256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257  *                              |                        not needed) v
258  *                              v                                    .
259  *                            done . . . . . . . . . . . . . . . . . .
260  *                              ^
261  *                              |
262  *                     RBD_OBJ_WRITE_FLAT
263  *
264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265  * assert_exists guard is needed or not (in some cases it's not needed
266  * even if there is a parent).
267  */
268 enum rbd_obj_write_state {
269         RBD_OBJ_WRITE_START = 1,
270         RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271         RBD_OBJ_WRITE_OBJECT,
272         __RBD_OBJ_WRITE_COPYUP,
273         RBD_OBJ_WRITE_COPYUP,
274         RBD_OBJ_WRITE_POST_OBJECT_MAP,
275 };
276
277 enum rbd_obj_copyup_state {
278         RBD_OBJ_COPYUP_START = 1,
279         RBD_OBJ_COPYUP_READ_PARENT,
280         __RBD_OBJ_COPYUP_OBJECT_MAPS,
281         RBD_OBJ_COPYUP_OBJECT_MAPS,
282         __RBD_OBJ_COPYUP_WRITE_OBJECT,
283         RBD_OBJ_COPYUP_WRITE_OBJECT,
284 };
285
286 struct rbd_obj_request {
287         struct ceph_object_extent ex;
288         unsigned int            flags;  /* RBD_OBJ_FLAG_* */
289         union {
290                 enum rbd_obj_read_state  read_state;    /* for reads */
291                 enum rbd_obj_write_state write_state;   /* for writes */
292         };
293
294         struct rbd_img_request  *img_request;
295         struct ceph_file_extent *img_extents;
296         u32                     num_img_extents;
297
298         union {
299                 struct ceph_bio_iter    bio_pos;
300                 struct {
301                         struct ceph_bvec_iter   bvec_pos;
302                         u32                     bvec_count;
303                         u32                     bvec_idx;
304                 };
305         };
306
307         enum rbd_obj_copyup_state copyup_state;
308         struct bio_vec          *copyup_bvecs;
309         u32                     copyup_bvec_count;
310
311         struct list_head        osd_reqs;       /* w/ r_private_item */
312
313         struct mutex            state_mutex;
314         struct pending_result   pending;
315         struct kref             kref;
316 };
317
318 enum img_req_flags {
319         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
320         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
321 };
322
323 enum rbd_img_state {
324         RBD_IMG_START = 1,
325         RBD_IMG_EXCLUSIVE_LOCK,
326         __RBD_IMG_OBJECT_REQUESTS,
327         RBD_IMG_OBJECT_REQUESTS,
328 };
329
330 struct rbd_img_request {
331         struct rbd_device       *rbd_dev;
332         enum obj_operation_type op_type;
333         enum obj_request_type   data_type;
334         unsigned long           flags;
335         enum rbd_img_state      state;
336         union {
337                 u64                     snap_id;        /* for reads */
338                 struct ceph_snap_context *snapc;        /* for writes */
339         };
340         union {
341                 struct request          *rq;            /* block request */
342                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
343         };
344
345         struct list_head        lock_item;
346         struct list_head        object_extents; /* obj_req.ex structs */
347
348         struct mutex            state_mutex;
349         struct pending_result   pending;
350         struct work_struct      work;
351         int                     work_result;
352         struct kref             kref;
353 };
354
355 #define for_each_obj_request(ireq, oreq) \
356         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
357 #define for_each_obj_request_safe(ireq, oreq, n) \
358         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
359
360 enum rbd_watch_state {
361         RBD_WATCH_STATE_UNREGISTERED,
362         RBD_WATCH_STATE_REGISTERED,
363         RBD_WATCH_STATE_ERROR,
364 };
365
366 enum rbd_lock_state {
367         RBD_LOCK_STATE_UNLOCKED,
368         RBD_LOCK_STATE_LOCKED,
369         RBD_LOCK_STATE_RELEASING,
370 };
371
372 /* WatchNotify::ClientId */
373 struct rbd_client_id {
374         u64 gid;
375         u64 handle;
376 };
377
378 struct rbd_mapping {
379         u64                     size;
380         u64                     features;
381 };
382
383 /*
384  * a single device
385  */
386 struct rbd_device {
387         int                     dev_id;         /* blkdev unique id */
388
389         int                     major;          /* blkdev assigned major */
390         int                     minor;
391         struct gendisk          *disk;          /* blkdev's gendisk and rq */
392
393         u32                     image_format;   /* Either 1 or 2 */
394         struct rbd_client       *rbd_client;
395
396         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
397
398         spinlock_t              lock;           /* queue, flags, open_count */
399
400         struct rbd_image_header header;
401         unsigned long           flags;          /* possibly lock protected */
402         struct rbd_spec         *spec;
403         struct rbd_options      *opts;
404         char                    *config_info;   /* add{,_single_major} string */
405
406         struct ceph_object_id   header_oid;
407         struct ceph_object_locator header_oloc;
408
409         struct ceph_file_layout layout;         /* used for all rbd requests */
410
411         struct mutex            watch_mutex;
412         enum rbd_watch_state    watch_state;
413         struct ceph_osd_linger_request *watch_handle;
414         u64                     watch_cookie;
415         struct delayed_work     watch_dwork;
416
417         struct rw_semaphore     lock_rwsem;
418         enum rbd_lock_state     lock_state;
419         char                    lock_cookie[32];
420         struct rbd_client_id    owner_cid;
421         struct work_struct      acquired_lock_work;
422         struct work_struct      released_lock_work;
423         struct delayed_work     lock_dwork;
424         struct work_struct      unlock_work;
425         spinlock_t              lock_lists_lock;
426         struct list_head        acquiring_list;
427         struct list_head        running_list;
428         struct completion       acquire_wait;
429         int                     acquire_err;
430         struct completion       releasing_wait;
431
432         spinlock_t              object_map_lock;
433         u8                      *object_map;
434         u64                     object_map_size;        /* in objects */
435         u64                     object_map_flags;
436
437         struct workqueue_struct *task_wq;
438
439         struct rbd_spec         *parent_spec;
440         u64                     parent_overlap;
441         atomic_t                parent_ref;
442         struct rbd_device       *parent;
443
444         /* Block layer tags. */
445         struct blk_mq_tag_set   tag_set;
446
447         /* protects updating the header */
448         struct rw_semaphore     header_rwsem;
449
450         struct rbd_mapping      mapping;
451
452         struct list_head        node;
453
454         /* sysfs related */
455         struct device           dev;
456         unsigned long           open_count;     /* protected by lock */
457 };
458
459 /*
460  * Flag bits for rbd_dev->flags:
461  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
462  *   by rbd_dev->lock
463  */
464 enum rbd_dev_flags {
465         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
466         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
467 };
468
469 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
470
471 static LIST_HEAD(rbd_dev_list);    /* devices */
472 static DEFINE_SPINLOCK(rbd_dev_list_lock);
473
474 static LIST_HEAD(rbd_client_list);              /* clients */
475 static DEFINE_SPINLOCK(rbd_client_list_lock);
476
477 /* Slab caches for frequently-allocated structures */
478
479 static struct kmem_cache        *rbd_img_request_cache;
480 static struct kmem_cache        *rbd_obj_request_cache;
481
482 static int rbd_major;
483 static DEFINE_IDA(rbd_dev_id_ida);
484
485 static struct workqueue_struct *rbd_wq;
486
487 static struct ceph_snap_context rbd_empty_snapc = {
488         .nref = REFCOUNT_INIT(1),
489 };
490
491 /*
492  * single-major requires >= 0.75 version of userspace rbd utility.
493  */
494 static bool single_major = true;
495 module_param(single_major, bool, 0444);
496 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
497
498 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
499 static ssize_t remove_store(struct bus_type *bus, const char *buf,
500                             size_t count);
501 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
502                                       size_t count);
503 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
504                                          size_t count);
505 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
506
507 static int rbd_dev_id_to_minor(int dev_id)
508 {
509         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
510 }
511
512 static int minor_to_rbd_dev_id(int minor)
513 {
514         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
515 }
516
517 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
518 {
519         lockdep_assert_held(&rbd_dev->lock_rwsem);
520
521         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
522                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
523 }
524
525 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
526 {
527         bool is_lock_owner;
528
529         down_read(&rbd_dev->lock_rwsem);
530         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
531         up_read(&rbd_dev->lock_rwsem);
532         return is_lock_owner;
533 }
534
535 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
536 {
537         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
538 }
539
540 static BUS_ATTR_WO(add);
541 static BUS_ATTR_WO(remove);
542 static BUS_ATTR_WO(add_single_major);
543 static BUS_ATTR_WO(remove_single_major);
544 static BUS_ATTR_RO(supported_features);
545
546 static struct attribute *rbd_bus_attrs[] = {
547         &bus_attr_add.attr,
548         &bus_attr_remove.attr,
549         &bus_attr_add_single_major.attr,
550         &bus_attr_remove_single_major.attr,
551         &bus_attr_supported_features.attr,
552         NULL,
553 };
554
555 static umode_t rbd_bus_is_visible(struct kobject *kobj,
556                                   struct attribute *attr, int index)
557 {
558         if (!single_major &&
559             (attr == &bus_attr_add_single_major.attr ||
560              attr == &bus_attr_remove_single_major.attr))
561                 return 0;
562
563         return attr->mode;
564 }
565
566 static const struct attribute_group rbd_bus_group = {
567         .attrs = rbd_bus_attrs,
568         .is_visible = rbd_bus_is_visible,
569 };
570 __ATTRIBUTE_GROUPS(rbd_bus);
571
572 static struct bus_type rbd_bus_type = {
573         .name           = "rbd",
574         .bus_groups     = rbd_bus_groups,
575 };
576
577 static void rbd_root_dev_release(struct device *dev)
578 {
579 }
580
581 static struct device rbd_root_dev = {
582         .init_name =    "rbd",
583         .release =      rbd_root_dev_release,
584 };
585
586 static __printf(2, 3)
587 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
588 {
589         struct va_format vaf;
590         va_list args;
591
592         va_start(args, fmt);
593         vaf.fmt = fmt;
594         vaf.va = &args;
595
596         if (!rbd_dev)
597                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
598         else if (rbd_dev->disk)
599                 printk(KERN_WARNING "%s: %s: %pV\n",
600                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
601         else if (rbd_dev->spec && rbd_dev->spec->image_name)
602                 printk(KERN_WARNING "%s: image %s: %pV\n",
603                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
604         else if (rbd_dev->spec && rbd_dev->spec->image_id)
605                 printk(KERN_WARNING "%s: id %s: %pV\n",
606                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
607         else    /* punt */
608                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
609                         RBD_DRV_NAME, rbd_dev, &vaf);
610         va_end(args);
611 }
612
613 #ifdef RBD_DEBUG
614 #define rbd_assert(expr)                                                \
615                 if (unlikely(!(expr))) {                                \
616                         printk(KERN_ERR "\nAssertion failure in %s() "  \
617                                                 "at line %d:\n\n"       \
618                                         "\trbd_assert(%s);\n\n",        \
619                                         __func__, __LINE__, #expr);     \
620                         BUG();                                          \
621                 }
622 #else /* !RBD_DEBUG */
623 #  define rbd_assert(expr)      ((void) 0)
624 #endif /* !RBD_DEBUG */
625
626 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
627
628 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
629 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
630 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
631 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
632 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
633                                         u64 snap_id);
634 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
635                                 u8 *order, u64 *snap_size);
636 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
637                 u64 *snap_features);
638 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
639
640 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
641 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
642
643 /*
644  * Return true if nothing else is pending.
645  */
646 static bool pending_result_dec(struct pending_result *pending, int *result)
647 {
648         rbd_assert(pending->num_pending > 0);
649
650         if (*result && !pending->result)
651                 pending->result = *result;
652         if (--pending->num_pending)
653                 return false;
654
655         *result = pending->result;
656         return true;
657 }
658
659 static int rbd_open(struct block_device *bdev, fmode_t mode)
660 {
661         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
662         bool removing = false;
663
664         spin_lock_irq(&rbd_dev->lock);
665         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
666                 removing = true;
667         else
668                 rbd_dev->open_count++;
669         spin_unlock_irq(&rbd_dev->lock);
670         if (removing)
671                 return -ENOENT;
672
673         (void) get_device(&rbd_dev->dev);
674
675         return 0;
676 }
677
678 static void rbd_release(struct gendisk *disk, fmode_t mode)
679 {
680         struct rbd_device *rbd_dev = disk->private_data;
681         unsigned long open_count_before;
682
683         spin_lock_irq(&rbd_dev->lock);
684         open_count_before = rbd_dev->open_count--;
685         spin_unlock_irq(&rbd_dev->lock);
686         rbd_assert(open_count_before > 0);
687
688         put_device(&rbd_dev->dev);
689 }
690
691 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
692 {
693         int ro;
694
695         if (get_user(ro, (int __user *)arg))
696                 return -EFAULT;
697
698         /* Snapshots can't be marked read-write */
699         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
700                 return -EROFS;
701
702         /* Let blkdev_roset() handle it */
703         return -ENOTTY;
704 }
705
706 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
707                         unsigned int cmd, unsigned long arg)
708 {
709         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
710         int ret;
711
712         switch (cmd) {
713         case BLKROSET:
714                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
715                 break;
716         default:
717                 ret = -ENOTTY;
718         }
719
720         return ret;
721 }
722
723 #ifdef CONFIG_COMPAT
724 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
725                                 unsigned int cmd, unsigned long arg)
726 {
727         return rbd_ioctl(bdev, mode, cmd, arg);
728 }
729 #endif /* CONFIG_COMPAT */
730
731 static const struct block_device_operations rbd_bd_ops = {
732         .owner                  = THIS_MODULE,
733         .open                   = rbd_open,
734         .release                = rbd_release,
735         .ioctl                  = rbd_ioctl,
736 #ifdef CONFIG_COMPAT
737         .compat_ioctl           = rbd_compat_ioctl,
738 #endif
739 };
740
741 /*
742  * Initialize an rbd client instance.  Success or not, this function
743  * consumes ceph_opts.  Caller holds client_mutex.
744  */
745 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
746 {
747         struct rbd_client *rbdc;
748         int ret = -ENOMEM;
749
750         dout("%s:\n", __func__);
751         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
752         if (!rbdc)
753                 goto out_opt;
754
755         kref_init(&rbdc->kref);
756         INIT_LIST_HEAD(&rbdc->node);
757
758         rbdc->client = ceph_create_client(ceph_opts, rbdc);
759         if (IS_ERR(rbdc->client))
760                 goto out_rbdc;
761         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
762
763         ret = ceph_open_session(rbdc->client);
764         if (ret < 0)
765                 goto out_client;
766
767         spin_lock(&rbd_client_list_lock);
768         list_add_tail(&rbdc->node, &rbd_client_list);
769         spin_unlock(&rbd_client_list_lock);
770
771         dout("%s: rbdc %p\n", __func__, rbdc);
772
773         return rbdc;
774 out_client:
775         ceph_destroy_client(rbdc->client);
776 out_rbdc:
777         kfree(rbdc);
778 out_opt:
779         if (ceph_opts)
780                 ceph_destroy_options(ceph_opts);
781         dout("%s: error %d\n", __func__, ret);
782
783         return ERR_PTR(ret);
784 }
785
786 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
787 {
788         kref_get(&rbdc->kref);
789
790         return rbdc;
791 }
792
793 /*
794  * Find a ceph client with specific addr and configuration.  If
795  * found, bump its reference count.
796  */
797 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
798 {
799         struct rbd_client *client_node;
800         bool found = false;
801
802         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
803                 return NULL;
804
805         spin_lock(&rbd_client_list_lock);
806         list_for_each_entry(client_node, &rbd_client_list, node) {
807                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
808                         __rbd_get_client(client_node);
809
810                         found = true;
811                         break;
812                 }
813         }
814         spin_unlock(&rbd_client_list_lock);
815
816         return found ? client_node : NULL;
817 }
818
819 /*
820  * (Per device) rbd map options
821  */
822 enum {
823         Opt_queue_depth,
824         Opt_alloc_size,
825         Opt_lock_timeout,
826         Opt_last_int,
827         /* int args above */
828         Opt_pool_ns,
829         Opt_last_string,
830         /* string args above */
831         Opt_read_only,
832         Opt_read_write,
833         Opt_lock_on_read,
834         Opt_exclusive,
835         Opt_notrim,
836         Opt_err
837 };
838
839 static match_table_t rbd_opts_tokens = {
840         {Opt_queue_depth, "queue_depth=%d"},
841         {Opt_alloc_size, "alloc_size=%d"},
842         {Opt_lock_timeout, "lock_timeout=%d"},
843         /* int args above */
844         {Opt_pool_ns, "_pool_ns=%s"},
845         /* string args above */
846         {Opt_read_only, "read_only"},
847         {Opt_read_only, "ro"},          /* Alternate spelling */
848         {Opt_read_write, "read_write"},
849         {Opt_read_write, "rw"},         /* Alternate spelling */
850         {Opt_lock_on_read, "lock_on_read"},
851         {Opt_exclusive, "exclusive"},
852         {Opt_notrim, "notrim"},
853         {Opt_err, NULL}
854 };
855
856 struct rbd_options {
857         int     queue_depth;
858         int     alloc_size;
859         unsigned long   lock_timeout;
860         bool    read_only;
861         bool    lock_on_read;
862         bool    exclusive;
863         bool    trim;
864 };
865
866 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
867 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
868 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
869 #define RBD_READ_ONLY_DEFAULT   false
870 #define RBD_LOCK_ON_READ_DEFAULT false
871 #define RBD_EXCLUSIVE_DEFAULT   false
872 #define RBD_TRIM_DEFAULT        true
873
874 struct parse_rbd_opts_ctx {
875         struct rbd_spec         *spec;
876         struct rbd_options      *opts;
877 };
878
879 static int parse_rbd_opts_token(char *c, void *private)
880 {
881         struct parse_rbd_opts_ctx *pctx = private;
882         substring_t argstr[MAX_OPT_ARGS];
883         int token, intval, ret;
884
885         token = match_token(c, rbd_opts_tokens, argstr);
886         if (token < Opt_last_int) {
887                 ret = match_int(&argstr[0], &intval);
888                 if (ret < 0) {
889                         pr_err("bad option arg (not int) at '%s'\n", c);
890                         return ret;
891                 }
892                 dout("got int token %d val %d\n", token, intval);
893         } else if (token > Opt_last_int && token < Opt_last_string) {
894                 dout("got string token %d val %s\n", token, argstr[0].from);
895         } else {
896                 dout("got token %d\n", token);
897         }
898
899         switch (token) {
900         case Opt_queue_depth:
901                 if (intval < 1) {
902                         pr_err("queue_depth out of range\n");
903                         return -EINVAL;
904                 }
905                 pctx->opts->queue_depth = intval;
906                 break;
907         case Opt_alloc_size:
908                 if (intval < SECTOR_SIZE) {
909                         pr_err("alloc_size out of range\n");
910                         return -EINVAL;
911                 }
912                 if (!is_power_of_2(intval)) {
913                         pr_err("alloc_size must be a power of 2\n");
914                         return -EINVAL;
915                 }
916                 pctx->opts->alloc_size = intval;
917                 break;
918         case Opt_lock_timeout:
919                 /* 0 is "wait forever" (i.e. infinite timeout) */
920                 if (intval < 0 || intval > INT_MAX / 1000) {
921                         pr_err("lock_timeout out of range\n");
922                         return -EINVAL;
923                 }
924                 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
925                 break;
926         case Opt_pool_ns:
927                 kfree(pctx->spec->pool_ns);
928                 pctx->spec->pool_ns = match_strdup(argstr);
929                 if (!pctx->spec->pool_ns)
930                         return -ENOMEM;
931                 break;
932         case Opt_read_only:
933                 pctx->opts->read_only = true;
934                 break;
935         case Opt_read_write:
936                 pctx->opts->read_only = false;
937                 break;
938         case Opt_lock_on_read:
939                 pctx->opts->lock_on_read = true;
940                 break;
941         case Opt_exclusive:
942                 pctx->opts->exclusive = true;
943                 break;
944         case Opt_notrim:
945                 pctx->opts->trim = false;
946                 break;
947         default:
948                 /* libceph prints "bad option" msg */
949                 return -EINVAL;
950         }
951
952         return 0;
953 }
954
955 static char* obj_op_name(enum obj_operation_type op_type)
956 {
957         switch (op_type) {
958         case OBJ_OP_READ:
959                 return "read";
960         case OBJ_OP_WRITE:
961                 return "write";
962         case OBJ_OP_DISCARD:
963                 return "discard";
964         case OBJ_OP_ZEROOUT:
965                 return "zeroout";
966         default:
967                 return "???";
968         }
969 }
970
971 /*
972  * Destroy ceph client
973  *
974  * Caller must hold rbd_client_list_lock.
975  */
976 static void rbd_client_release(struct kref *kref)
977 {
978         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
979
980         dout("%s: rbdc %p\n", __func__, rbdc);
981         spin_lock(&rbd_client_list_lock);
982         list_del(&rbdc->node);
983         spin_unlock(&rbd_client_list_lock);
984
985         ceph_destroy_client(rbdc->client);
986         kfree(rbdc);
987 }
988
989 /*
990  * Drop reference to ceph client node. If it's not referenced anymore, release
991  * it.
992  */
993 static void rbd_put_client(struct rbd_client *rbdc)
994 {
995         if (rbdc)
996                 kref_put(&rbdc->kref, rbd_client_release);
997 }
998
999 /*
1000  * Get a ceph client with specific addr and configuration, if one does
1001  * not exist create it.  Either way, ceph_opts is consumed by this
1002  * function.
1003  */
1004 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1005 {
1006         struct rbd_client *rbdc;
1007         int ret;
1008
1009         mutex_lock(&client_mutex);
1010         rbdc = rbd_client_find(ceph_opts);
1011         if (rbdc) {
1012                 ceph_destroy_options(ceph_opts);
1013
1014                 /*
1015                  * Using an existing client.  Make sure ->pg_pools is up to
1016                  * date before we look up the pool id in do_rbd_add().
1017                  */
1018                 ret = ceph_wait_for_latest_osdmap(rbdc->client,
1019                                         rbdc->client->options->mount_timeout);
1020                 if (ret) {
1021                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1022                         rbd_put_client(rbdc);
1023                         rbdc = ERR_PTR(ret);
1024                 }
1025         } else {
1026                 rbdc = rbd_client_create(ceph_opts);
1027         }
1028         mutex_unlock(&client_mutex);
1029
1030         return rbdc;
1031 }
1032
1033 static bool rbd_image_format_valid(u32 image_format)
1034 {
1035         return image_format == 1 || image_format == 2;
1036 }
1037
1038 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1039 {
1040         size_t size;
1041         u32 snap_count;
1042
1043         /* The header has to start with the magic rbd header text */
1044         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1045                 return false;
1046
1047         /* The bio layer requires at least sector-sized I/O */
1048
1049         if (ondisk->options.order < SECTOR_SHIFT)
1050                 return false;
1051
1052         /* If we use u64 in a few spots we may be able to loosen this */
1053
1054         if (ondisk->options.order > 8 * sizeof (int) - 1)
1055                 return false;
1056
1057         /*
1058          * The size of a snapshot header has to fit in a size_t, and
1059          * that limits the number of snapshots.
1060          */
1061         snap_count = le32_to_cpu(ondisk->snap_count);
1062         size = SIZE_MAX - sizeof (struct ceph_snap_context);
1063         if (snap_count > size / sizeof (__le64))
1064                 return false;
1065
1066         /*
1067          * Not only that, but the size of the entire the snapshot
1068          * header must also be representable in a size_t.
1069          */
1070         size -= snap_count * sizeof (__le64);
1071         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1072                 return false;
1073
1074         return true;
1075 }
1076
1077 /*
1078  * returns the size of an object in the image
1079  */
1080 static u32 rbd_obj_bytes(struct rbd_image_header *header)
1081 {
1082         return 1U << header->obj_order;
1083 }
1084
1085 static void rbd_init_layout(struct rbd_device *rbd_dev)
1086 {
1087         if (rbd_dev->header.stripe_unit == 0 ||
1088             rbd_dev->header.stripe_count == 0) {
1089                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1090                 rbd_dev->header.stripe_count = 1;
1091         }
1092
1093         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1094         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1095         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1096         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1097                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1098         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1099 }
1100
1101 /*
1102  * Fill an rbd image header with information from the given format 1
1103  * on-disk header.
1104  */
1105 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1106                                  struct rbd_image_header_ondisk *ondisk)
1107 {
1108         struct rbd_image_header *header = &rbd_dev->header;
1109         bool first_time = header->object_prefix == NULL;
1110         struct ceph_snap_context *snapc;
1111         char *object_prefix = NULL;
1112         char *snap_names = NULL;
1113         u64 *snap_sizes = NULL;
1114         u32 snap_count;
1115         int ret = -ENOMEM;
1116         u32 i;
1117
1118         /* Allocate this now to avoid having to handle failure below */
1119
1120         if (first_time) {
1121                 object_prefix = kstrndup(ondisk->object_prefix,
1122                                          sizeof(ondisk->object_prefix),
1123                                          GFP_KERNEL);
1124                 if (!object_prefix)
1125                         return -ENOMEM;
1126         }
1127
1128         /* Allocate the snapshot context and fill it in */
1129
1130         snap_count = le32_to_cpu(ondisk->snap_count);
1131         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1132         if (!snapc)
1133                 goto out_err;
1134         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1135         if (snap_count) {
1136                 struct rbd_image_snap_ondisk *snaps;
1137                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1138
1139                 /* We'll keep a copy of the snapshot names... */
1140
1141                 if (snap_names_len > (u64)SIZE_MAX)
1142                         goto out_2big;
1143                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1144                 if (!snap_names)
1145                         goto out_err;
1146
1147                 /* ...as well as the array of their sizes. */
1148                 snap_sizes = kmalloc_array(snap_count,
1149                                            sizeof(*header->snap_sizes),
1150                                            GFP_KERNEL);
1151                 if (!snap_sizes)
1152                         goto out_err;
1153
1154                 /*
1155                  * Copy the names, and fill in each snapshot's id
1156                  * and size.
1157                  *
1158                  * Note that rbd_dev_v1_header_info() guarantees the
1159                  * ondisk buffer we're working with has
1160                  * snap_names_len bytes beyond the end of the
1161                  * snapshot id array, this memcpy() is safe.
1162                  */
1163                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1164                 snaps = ondisk->snaps;
1165                 for (i = 0; i < snap_count; i++) {
1166                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1167                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1168                 }
1169         }
1170
1171         /* We won't fail any more, fill in the header */
1172
1173         if (first_time) {
1174                 header->object_prefix = object_prefix;
1175                 header->obj_order = ondisk->options.order;
1176                 rbd_init_layout(rbd_dev);
1177         } else {
1178                 ceph_put_snap_context(header->snapc);
1179                 kfree(header->snap_names);
1180                 kfree(header->snap_sizes);
1181         }
1182
1183         /* The remaining fields always get updated (when we refresh) */
1184
1185         header->image_size = le64_to_cpu(ondisk->image_size);
1186         header->snapc = snapc;
1187         header->snap_names = snap_names;
1188         header->snap_sizes = snap_sizes;
1189
1190         return 0;
1191 out_2big:
1192         ret = -EIO;
1193 out_err:
1194         kfree(snap_sizes);
1195         kfree(snap_names);
1196         ceph_put_snap_context(snapc);
1197         kfree(object_prefix);
1198
1199         return ret;
1200 }
1201
1202 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1203 {
1204         const char *snap_name;
1205
1206         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1207
1208         /* Skip over names until we find the one we are looking for */
1209
1210         snap_name = rbd_dev->header.snap_names;
1211         while (which--)
1212                 snap_name += strlen(snap_name) + 1;
1213
1214         return kstrdup(snap_name, GFP_KERNEL);
1215 }
1216
1217 /*
1218  * Snapshot id comparison function for use with qsort()/bsearch().
1219  * Note that result is for snapshots in *descending* order.
1220  */
1221 static int snapid_compare_reverse(const void *s1, const void *s2)
1222 {
1223         u64 snap_id1 = *(u64 *)s1;
1224         u64 snap_id2 = *(u64 *)s2;
1225
1226         if (snap_id1 < snap_id2)
1227                 return 1;
1228         return snap_id1 == snap_id2 ? 0 : -1;
1229 }
1230
1231 /*
1232  * Search a snapshot context to see if the given snapshot id is
1233  * present.
1234  *
1235  * Returns the position of the snapshot id in the array if it's found,
1236  * or BAD_SNAP_INDEX otherwise.
1237  *
1238  * Note: The snapshot array is in kept sorted (by the osd) in
1239  * reverse order, highest snapshot id first.
1240  */
1241 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1242 {
1243         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1244         u64 *found;
1245
1246         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1247                                 sizeof (snap_id), snapid_compare_reverse);
1248
1249         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1250 }
1251
1252 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1253                                         u64 snap_id)
1254 {
1255         u32 which;
1256         const char *snap_name;
1257
1258         which = rbd_dev_snap_index(rbd_dev, snap_id);
1259         if (which == BAD_SNAP_INDEX)
1260                 return ERR_PTR(-ENOENT);
1261
1262         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1263         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1264 }
1265
1266 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1267 {
1268         if (snap_id == CEPH_NOSNAP)
1269                 return RBD_SNAP_HEAD_NAME;
1270
1271         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1272         if (rbd_dev->image_format == 1)
1273                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1274
1275         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1276 }
1277
1278 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1279                                 u64 *snap_size)
1280 {
1281         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1282         if (snap_id == CEPH_NOSNAP) {
1283                 *snap_size = rbd_dev->header.image_size;
1284         } else if (rbd_dev->image_format == 1) {
1285                 u32 which;
1286
1287                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1288                 if (which == BAD_SNAP_INDEX)
1289                         return -ENOENT;
1290
1291                 *snap_size = rbd_dev->header.snap_sizes[which];
1292         } else {
1293                 u64 size = 0;
1294                 int ret;
1295
1296                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1297                 if (ret)
1298                         return ret;
1299
1300                 *snap_size = size;
1301         }
1302         return 0;
1303 }
1304
1305 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1306                         u64 *snap_features)
1307 {
1308         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1309         if (snap_id == CEPH_NOSNAP) {
1310                 *snap_features = rbd_dev->header.features;
1311         } else if (rbd_dev->image_format == 1) {
1312                 *snap_features = 0;     /* No features for format 1 */
1313         } else {
1314                 u64 features = 0;
1315                 int ret;
1316
1317                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1318                 if (ret)
1319                         return ret;
1320
1321                 *snap_features = features;
1322         }
1323         return 0;
1324 }
1325
1326 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1327 {
1328         u64 snap_id = rbd_dev->spec->snap_id;
1329         u64 size = 0;
1330         u64 features = 0;
1331         int ret;
1332
1333         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1334         if (ret)
1335                 return ret;
1336         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1337         if (ret)
1338                 return ret;
1339
1340         rbd_dev->mapping.size = size;
1341         rbd_dev->mapping.features = features;
1342
1343         return 0;
1344 }
1345
1346 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1347 {
1348         rbd_dev->mapping.size = 0;
1349         rbd_dev->mapping.features = 0;
1350 }
1351
1352 static void zero_bvec(struct bio_vec *bv)
1353 {
1354         void *buf;
1355         unsigned long flags;
1356
1357         buf = bvec_kmap_irq(bv, &flags);
1358         memset(buf, 0, bv->bv_len);
1359         flush_dcache_page(bv->bv_page);
1360         bvec_kunmap_irq(buf, &flags);
1361 }
1362
1363 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1364 {
1365         struct ceph_bio_iter it = *bio_pos;
1366
1367         ceph_bio_iter_advance(&it, off);
1368         ceph_bio_iter_advance_step(&it, bytes, ({
1369                 zero_bvec(&bv);
1370         }));
1371 }
1372
1373 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1374 {
1375         struct ceph_bvec_iter it = *bvec_pos;
1376
1377         ceph_bvec_iter_advance(&it, off);
1378         ceph_bvec_iter_advance_step(&it, bytes, ({
1379                 zero_bvec(&bv);
1380         }));
1381 }
1382
1383 /*
1384  * Zero a range in @obj_req data buffer defined by a bio (list) or
1385  * (private) bio_vec array.
1386  *
1387  * @off is relative to the start of the data buffer.
1388  */
1389 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1390                                u32 bytes)
1391 {
1392         dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1393
1394         switch (obj_req->img_request->data_type) {
1395         case OBJ_REQUEST_BIO:
1396                 zero_bios(&obj_req->bio_pos, off, bytes);
1397                 break;
1398         case OBJ_REQUEST_BVECS:
1399         case OBJ_REQUEST_OWN_BVECS:
1400                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1401                 break;
1402         default:
1403                 BUG();
1404         }
1405 }
1406
1407 static void rbd_obj_request_destroy(struct kref *kref);
1408 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1409 {
1410         rbd_assert(obj_request != NULL);
1411         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1412                 kref_read(&obj_request->kref));
1413         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1414 }
1415
1416 static void rbd_img_request_destroy(struct kref *kref);
1417 static void rbd_img_request_put(struct rbd_img_request *img_request)
1418 {
1419         rbd_assert(img_request != NULL);
1420         dout("%s: img %p (was %d)\n", __func__, img_request,
1421                 kref_read(&img_request->kref));
1422         kref_put(&img_request->kref, rbd_img_request_destroy);
1423 }
1424
1425 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1426                                         struct rbd_obj_request *obj_request)
1427 {
1428         rbd_assert(obj_request->img_request == NULL);
1429
1430         /* Image request now owns object's original reference */
1431         obj_request->img_request = img_request;
1432         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1433 }
1434
1435 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1436                                         struct rbd_obj_request *obj_request)
1437 {
1438         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1439         list_del(&obj_request->ex.oe_item);
1440         rbd_assert(obj_request->img_request == img_request);
1441         rbd_obj_request_put(obj_request);
1442 }
1443
1444 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1445 {
1446         struct rbd_obj_request *obj_req = osd_req->r_priv;
1447
1448         dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1449              __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1450              obj_req->ex.oe_off, obj_req->ex.oe_len);
1451         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1452 }
1453
1454 /*
1455  * The default/initial value for all image request flags is 0.  Each
1456  * is conditionally set to 1 at image request initialization time
1457  * and currently never change thereafter.
1458  */
1459 static void img_request_layered_set(struct rbd_img_request *img_request)
1460 {
1461         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1462         smp_mb();
1463 }
1464
1465 static void img_request_layered_clear(struct rbd_img_request *img_request)
1466 {
1467         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1468         smp_mb();
1469 }
1470
1471 static bool img_request_layered_test(struct rbd_img_request *img_request)
1472 {
1473         smp_mb();
1474         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1475 }
1476
1477 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1478 {
1479         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1480
1481         return !obj_req->ex.oe_off &&
1482                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1483 }
1484
1485 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1486 {
1487         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1488
1489         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1490                                         rbd_dev->layout.object_size;
1491 }
1492
1493 /*
1494  * Must be called after rbd_obj_calc_img_extents().
1495  */
1496 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1497 {
1498         if (!obj_req->num_img_extents ||
1499             (rbd_obj_is_entire(obj_req) &&
1500              !obj_req->img_request->snapc->num_snaps))
1501                 return false;
1502
1503         return true;
1504 }
1505
1506 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1507 {
1508         return ceph_file_extents_bytes(obj_req->img_extents,
1509                                        obj_req->num_img_extents);
1510 }
1511
1512 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1513 {
1514         switch (img_req->op_type) {
1515         case OBJ_OP_READ:
1516                 return false;
1517         case OBJ_OP_WRITE:
1518         case OBJ_OP_DISCARD:
1519         case OBJ_OP_ZEROOUT:
1520                 return true;
1521         default:
1522                 BUG();
1523         }
1524 }
1525
1526 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1527 {
1528         struct rbd_obj_request *obj_req = osd_req->r_priv;
1529         int result;
1530
1531         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1532              osd_req->r_result, obj_req);
1533
1534         /*
1535          * Writes aren't allowed to return a data payload.  In some
1536          * guarded write cases (e.g. stat + zero on an empty object)
1537          * a stat response makes it through, but we don't care.
1538          */
1539         if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1540                 result = 0;
1541         else
1542                 result = osd_req->r_result;
1543
1544         rbd_obj_handle_request(obj_req, result);
1545 }
1546
1547 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1548 {
1549         struct rbd_obj_request *obj_request = osd_req->r_priv;
1550
1551         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1552         osd_req->r_snapid = obj_request->img_request->snap_id;
1553 }
1554
1555 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1556 {
1557         struct rbd_obj_request *obj_request = osd_req->r_priv;
1558
1559         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1560         ktime_get_real_ts64(&osd_req->r_mtime);
1561         osd_req->r_data_offset = obj_request->ex.oe_off;
1562 }
1563
1564 static struct ceph_osd_request *
1565 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1566                           struct ceph_snap_context *snapc, int num_ops)
1567 {
1568         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1569         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1570         struct ceph_osd_request *req;
1571         const char *name_format = rbd_dev->image_format == 1 ?
1572                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1573         int ret;
1574
1575         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1576         if (!req)
1577                 return ERR_PTR(-ENOMEM);
1578
1579         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1580         req->r_callback = rbd_osd_req_callback;
1581         req->r_priv = obj_req;
1582
1583         /*
1584          * Data objects may be stored in a separate pool, but always in
1585          * the same namespace in that pool as the header in its pool.
1586          */
1587         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1588         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1589
1590         ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1591                                rbd_dev->header.object_prefix,
1592                                obj_req->ex.oe_objno);
1593         if (ret)
1594                 return ERR_PTR(ret);
1595
1596         return req;
1597 }
1598
1599 static struct ceph_osd_request *
1600 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1601 {
1602         return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1603                                          num_ops);
1604 }
1605
1606 static struct rbd_obj_request *rbd_obj_request_create(void)
1607 {
1608         struct rbd_obj_request *obj_request;
1609
1610         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1611         if (!obj_request)
1612                 return NULL;
1613
1614         ceph_object_extent_init(&obj_request->ex);
1615         INIT_LIST_HEAD(&obj_request->osd_reqs);
1616         mutex_init(&obj_request->state_mutex);
1617         kref_init(&obj_request->kref);
1618
1619         dout("%s %p\n", __func__, obj_request);
1620         return obj_request;
1621 }
1622
1623 static void rbd_obj_request_destroy(struct kref *kref)
1624 {
1625         struct rbd_obj_request *obj_request;
1626         struct ceph_osd_request *osd_req;
1627         u32 i;
1628
1629         obj_request = container_of(kref, struct rbd_obj_request, kref);
1630
1631         dout("%s: obj %p\n", __func__, obj_request);
1632
1633         while (!list_empty(&obj_request->osd_reqs)) {
1634                 osd_req = list_first_entry(&obj_request->osd_reqs,
1635                                     struct ceph_osd_request, r_private_item);
1636                 list_del_init(&osd_req->r_private_item);
1637                 ceph_osdc_put_request(osd_req);
1638         }
1639
1640         switch (obj_request->img_request->data_type) {
1641         case OBJ_REQUEST_NODATA:
1642         case OBJ_REQUEST_BIO:
1643         case OBJ_REQUEST_BVECS:
1644                 break;          /* Nothing to do */
1645         case OBJ_REQUEST_OWN_BVECS:
1646                 kfree(obj_request->bvec_pos.bvecs);
1647                 break;
1648         default:
1649                 BUG();
1650         }
1651
1652         kfree(obj_request->img_extents);
1653         if (obj_request->copyup_bvecs) {
1654                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1655                         if (obj_request->copyup_bvecs[i].bv_page)
1656                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1657                 }
1658                 kfree(obj_request->copyup_bvecs);
1659         }
1660
1661         kmem_cache_free(rbd_obj_request_cache, obj_request);
1662 }
1663
1664 /* It's OK to call this for a device with no parent */
1665
1666 static void rbd_spec_put(struct rbd_spec *spec);
1667 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1668 {
1669         rbd_dev_remove_parent(rbd_dev);
1670         rbd_spec_put(rbd_dev->parent_spec);
1671         rbd_dev->parent_spec = NULL;
1672         rbd_dev->parent_overlap = 0;
1673 }
1674
1675 /*
1676  * Parent image reference counting is used to determine when an
1677  * image's parent fields can be safely torn down--after there are no
1678  * more in-flight requests to the parent image.  When the last
1679  * reference is dropped, cleaning them up is safe.
1680  */
1681 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1682 {
1683         int counter;
1684
1685         if (!rbd_dev->parent_spec)
1686                 return;
1687
1688         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1689         if (counter > 0)
1690                 return;
1691
1692         /* Last reference; clean up parent data structures */
1693
1694         if (!counter)
1695                 rbd_dev_unparent(rbd_dev);
1696         else
1697                 rbd_warn(rbd_dev, "parent reference underflow");
1698 }
1699
1700 /*
1701  * If an image has a non-zero parent overlap, get a reference to its
1702  * parent.
1703  *
1704  * Returns true if the rbd device has a parent with a non-zero
1705  * overlap and a reference for it was successfully taken, or
1706  * false otherwise.
1707  */
1708 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1709 {
1710         int counter = 0;
1711
1712         if (!rbd_dev->parent_spec)
1713                 return false;
1714
1715         down_read(&rbd_dev->header_rwsem);
1716         if (rbd_dev->parent_overlap)
1717                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1718         up_read(&rbd_dev->header_rwsem);
1719
1720         if (counter < 0)
1721                 rbd_warn(rbd_dev, "parent reference overflow");
1722
1723         return counter > 0;
1724 }
1725
1726 /*
1727  * Caller is responsible for filling in the list of object requests
1728  * that comprises the image request, and the Linux request pointer
1729  * (if there is one).
1730  */
1731 static struct rbd_img_request *rbd_img_request_create(
1732                                         struct rbd_device *rbd_dev,
1733                                         enum obj_operation_type op_type,
1734                                         struct ceph_snap_context *snapc)
1735 {
1736         struct rbd_img_request *img_request;
1737
1738         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1739         if (!img_request)
1740                 return NULL;
1741
1742         img_request->rbd_dev = rbd_dev;
1743         img_request->op_type = op_type;
1744         if (!rbd_img_is_write(img_request))
1745                 img_request->snap_id = rbd_dev->spec->snap_id;
1746         else
1747                 img_request->snapc = snapc;
1748
1749         if (rbd_dev_parent_get(rbd_dev))
1750                 img_request_layered_set(img_request);
1751
1752         INIT_LIST_HEAD(&img_request->lock_item);
1753         INIT_LIST_HEAD(&img_request->object_extents);
1754         mutex_init(&img_request->state_mutex);
1755         kref_init(&img_request->kref);
1756
1757         dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1758              obj_op_name(op_type), img_request);
1759         return img_request;
1760 }
1761
1762 static void rbd_img_request_destroy(struct kref *kref)
1763 {
1764         struct rbd_img_request *img_request;
1765         struct rbd_obj_request *obj_request;
1766         struct rbd_obj_request *next_obj_request;
1767
1768         img_request = container_of(kref, struct rbd_img_request, kref);
1769
1770         dout("%s: img %p\n", __func__, img_request);
1771
1772         WARN_ON(!list_empty(&img_request->lock_item));
1773         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1774                 rbd_img_obj_request_del(img_request, obj_request);
1775
1776         if (img_request_layered_test(img_request)) {
1777                 img_request_layered_clear(img_request);
1778                 rbd_dev_parent_put(img_request->rbd_dev);
1779         }
1780
1781         if (rbd_img_is_write(img_request))
1782                 ceph_put_snap_context(img_request->snapc);
1783
1784         kmem_cache_free(rbd_img_request_cache, img_request);
1785 }
1786
1787 #define BITS_PER_OBJ    2
1788 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1789 #define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1790
1791 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1792                                    u64 *index, u8 *shift)
1793 {
1794         u32 off;
1795
1796         rbd_assert(objno < rbd_dev->object_map_size);
1797         *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1798         *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1799 }
1800
1801 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1802 {
1803         u64 index;
1804         u8 shift;
1805
1806         lockdep_assert_held(&rbd_dev->object_map_lock);
1807         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1808         return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1809 }
1810
1811 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1812 {
1813         u64 index;
1814         u8 shift;
1815         u8 *p;
1816
1817         lockdep_assert_held(&rbd_dev->object_map_lock);
1818         rbd_assert(!(val & ~OBJ_MASK));
1819
1820         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1821         p = &rbd_dev->object_map[index];
1822         *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1823 }
1824
1825 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1826 {
1827         u8 state;
1828
1829         spin_lock(&rbd_dev->object_map_lock);
1830         state = __rbd_object_map_get(rbd_dev, objno);
1831         spin_unlock(&rbd_dev->object_map_lock);
1832         return state;
1833 }
1834
1835 static bool use_object_map(struct rbd_device *rbd_dev)
1836 {
1837         return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1838                 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1839 }
1840
1841 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1842 {
1843         u8 state;
1844
1845         /* fall back to default logic if object map is disabled or invalid */
1846         if (!use_object_map(rbd_dev))
1847                 return true;
1848
1849         state = rbd_object_map_get(rbd_dev, objno);
1850         return state != OBJECT_NONEXISTENT;
1851 }
1852
1853 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1854                                 struct ceph_object_id *oid)
1855 {
1856         if (snap_id == CEPH_NOSNAP)
1857                 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1858                                 rbd_dev->spec->image_id);
1859         else
1860                 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1861                                 rbd_dev->spec->image_id, snap_id);
1862 }
1863
1864 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1865 {
1866         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1867         CEPH_DEFINE_OID_ONSTACK(oid);
1868         u8 lock_type;
1869         char *lock_tag;
1870         struct ceph_locker *lockers;
1871         u32 num_lockers;
1872         bool broke_lock = false;
1873         int ret;
1874
1875         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1876
1877 again:
1878         ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1879                             CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1880         if (ret != -EBUSY || broke_lock) {
1881                 if (ret == -EEXIST)
1882                         ret = 0; /* already locked by myself */
1883                 if (ret)
1884                         rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1885                 return ret;
1886         }
1887
1888         ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1889                                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1890                                  &lockers, &num_lockers);
1891         if (ret) {
1892                 if (ret == -ENOENT)
1893                         goto again;
1894
1895                 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1896                 return ret;
1897         }
1898
1899         kfree(lock_tag);
1900         if (num_lockers == 0)
1901                 goto again;
1902
1903         rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1904                  ENTITY_NAME(lockers[0].id.name));
1905
1906         ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1907                                   RBD_LOCK_NAME, lockers[0].id.cookie,
1908                                   &lockers[0].id.name);
1909         ceph_free_lockers(lockers, num_lockers);
1910         if (ret) {
1911                 if (ret == -ENOENT)
1912                         goto again;
1913
1914                 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1915                 return ret;
1916         }
1917
1918         broke_lock = true;
1919         goto again;
1920 }
1921
1922 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1923 {
1924         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1925         CEPH_DEFINE_OID_ONSTACK(oid);
1926         int ret;
1927
1928         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1929
1930         ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1931                               "");
1932         if (ret && ret != -ENOENT)
1933                 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1934 }
1935
1936 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1937 {
1938         u8 struct_v;
1939         u32 struct_len;
1940         u32 header_len;
1941         void *header_end;
1942         int ret;
1943
1944         ceph_decode_32_safe(p, end, header_len, e_inval);
1945         header_end = *p + header_len;
1946
1947         ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1948                                   &struct_len);
1949         if (ret)
1950                 return ret;
1951
1952         ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1953
1954         *p = header_end;
1955         return 0;
1956
1957 e_inval:
1958         return -EINVAL;
1959 }
1960
1961 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1962 {
1963         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1964         CEPH_DEFINE_OID_ONSTACK(oid);
1965         struct page **pages;
1966         void *p, *end;
1967         size_t reply_len;
1968         u64 num_objects;
1969         u64 object_map_bytes;
1970         u64 object_map_size;
1971         int num_pages;
1972         int ret;
1973
1974         rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1975
1976         num_objects = ceph_get_num_objects(&rbd_dev->layout,
1977                                            rbd_dev->mapping.size);
1978         object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1979                                             BITS_PER_BYTE);
1980         num_pages = calc_pages_for(0, object_map_bytes) + 1;
1981         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1982         if (IS_ERR(pages))
1983                 return PTR_ERR(pages);
1984
1985         reply_len = num_pages * PAGE_SIZE;
1986         rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1987         ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1988                              "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1989                              NULL, 0, pages, &reply_len);
1990         if (ret)
1991                 goto out;
1992
1993         p = page_address(pages[0]);
1994         end = p + min(reply_len, (size_t)PAGE_SIZE);
1995         ret = decode_object_map_header(&p, end, &object_map_size);
1996         if (ret)
1997                 goto out;
1998
1999         if (object_map_size != num_objects) {
2000                 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
2001                          object_map_size, num_objects);
2002                 ret = -EINVAL;
2003                 goto out;
2004         }
2005
2006         if (offset_in_page(p) + object_map_bytes > reply_len) {
2007                 ret = -EINVAL;
2008                 goto out;
2009         }
2010
2011         rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
2012         if (!rbd_dev->object_map) {
2013                 ret = -ENOMEM;
2014                 goto out;
2015         }
2016
2017         rbd_dev->object_map_size = object_map_size;
2018         ceph_copy_from_page_vector(pages, rbd_dev->object_map,
2019                                    offset_in_page(p), object_map_bytes);
2020
2021 out:
2022         ceph_release_page_vector(pages, num_pages);
2023         return ret;
2024 }
2025
2026 static void rbd_object_map_free(struct rbd_device *rbd_dev)
2027 {
2028         kvfree(rbd_dev->object_map);
2029         rbd_dev->object_map = NULL;
2030         rbd_dev->object_map_size = 0;
2031 }
2032
2033 static int rbd_object_map_load(struct rbd_device *rbd_dev)
2034 {
2035         int ret;
2036
2037         ret = __rbd_object_map_load(rbd_dev);
2038         if (ret)
2039                 return ret;
2040
2041         ret = rbd_dev_v2_get_flags(rbd_dev);
2042         if (ret) {
2043                 rbd_object_map_free(rbd_dev);
2044                 return ret;
2045         }
2046
2047         if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2048                 rbd_warn(rbd_dev, "object map is invalid");
2049
2050         return 0;
2051 }
2052
2053 static int rbd_object_map_open(struct rbd_device *rbd_dev)
2054 {
2055         int ret;
2056
2057         ret = rbd_object_map_lock(rbd_dev);
2058         if (ret)
2059                 return ret;
2060
2061         ret = rbd_object_map_load(rbd_dev);
2062         if (ret) {
2063                 rbd_object_map_unlock(rbd_dev);
2064                 return ret;
2065         }
2066
2067         return 0;
2068 }
2069
2070 static void rbd_object_map_close(struct rbd_device *rbd_dev)
2071 {
2072         rbd_object_map_free(rbd_dev);
2073         rbd_object_map_unlock(rbd_dev);
2074 }
2075
2076 /*
2077  * This function needs snap_id (or more precisely just something to
2078  * distinguish between HEAD and snapshot object maps), new_state and
2079  * current_state that were passed to rbd_object_map_update().
2080  *
2081  * To avoid allocating and stashing a context we piggyback on the OSD
2082  * request.  A HEAD update has two ops (assert_locked).  For new_state
2083  * and current_state we decode our own object_map_update op, encoded in
2084  * rbd_cls_object_map_update().
2085  */
2086 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2087                                         struct ceph_osd_request *osd_req)
2088 {
2089         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2090         struct ceph_osd_data *osd_data;
2091         u64 objno;
2092         u8 state, new_state, current_state;
2093         bool has_current_state;
2094         void *p;
2095
2096         if (osd_req->r_result)
2097                 return osd_req->r_result;
2098
2099         /*
2100          * Nothing to do for a snapshot object map.
2101          */
2102         if (osd_req->r_num_ops == 1)
2103                 return 0;
2104
2105         /*
2106          * Update in-memory HEAD object map.
2107          */
2108         rbd_assert(osd_req->r_num_ops == 2);
2109         osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2110         rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2111
2112         p = page_address(osd_data->pages[0]);
2113         objno = ceph_decode_64(&p);
2114         rbd_assert(objno == obj_req->ex.oe_objno);
2115         rbd_assert(ceph_decode_64(&p) == objno + 1);
2116         new_state = ceph_decode_8(&p);
2117         has_current_state = ceph_decode_8(&p);
2118         if (has_current_state)
2119                 current_state = ceph_decode_8(&p);
2120
2121         spin_lock(&rbd_dev->object_map_lock);
2122         state = __rbd_object_map_get(rbd_dev, objno);
2123         if (!has_current_state || current_state == state ||
2124             (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2125                 __rbd_object_map_set(rbd_dev, objno, new_state);
2126         spin_unlock(&rbd_dev->object_map_lock);
2127
2128         return 0;
2129 }
2130
2131 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2132 {
2133         struct rbd_obj_request *obj_req = osd_req->r_priv;
2134         int result;
2135
2136         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2137              osd_req->r_result, obj_req);
2138
2139         result = rbd_object_map_update_finish(obj_req, osd_req);
2140         rbd_obj_handle_request(obj_req, result);
2141 }
2142
2143 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2144 {
2145         u8 state = rbd_object_map_get(rbd_dev, objno);
2146
2147         if (state == new_state ||
2148             (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2149             (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2150                 return false;
2151
2152         return true;
2153 }
2154
2155 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2156                                      int which, u64 objno, u8 new_state,
2157                                      const u8 *current_state)
2158 {
2159         struct page **pages;
2160         void *p, *start;
2161         int ret;
2162
2163         ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2164         if (ret)
2165                 return ret;
2166
2167         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2168         if (IS_ERR(pages))
2169                 return PTR_ERR(pages);
2170
2171         p = start = page_address(pages[0]);
2172         ceph_encode_64(&p, objno);
2173         ceph_encode_64(&p, objno + 1);
2174         ceph_encode_8(&p, new_state);
2175         if (current_state) {
2176                 ceph_encode_8(&p, 1);
2177                 ceph_encode_8(&p, *current_state);
2178         } else {
2179                 ceph_encode_8(&p, 0);
2180         }
2181
2182         osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2183                                           false, true);
2184         return 0;
2185 }
2186
2187 /*
2188  * Return:
2189  *   0 - object map update sent
2190  *   1 - object map update isn't needed
2191  *  <0 - error
2192  */
2193 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2194                                  u8 new_state, const u8 *current_state)
2195 {
2196         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2197         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2198         struct ceph_osd_request *req;
2199         int num_ops = 1;
2200         int which = 0;
2201         int ret;
2202
2203         if (snap_id == CEPH_NOSNAP) {
2204                 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2205                         return 1;
2206
2207                 num_ops++; /* assert_locked */
2208         }
2209
2210         req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2211         if (!req)
2212                 return -ENOMEM;
2213
2214         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2215         req->r_callback = rbd_object_map_callback;
2216         req->r_priv = obj_req;
2217
2218         rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2219         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2220         req->r_flags = CEPH_OSD_FLAG_WRITE;
2221         ktime_get_real_ts64(&req->r_mtime);
2222
2223         if (snap_id == CEPH_NOSNAP) {
2224                 /*
2225                  * Protect against possible race conditions during lock
2226                  * ownership transitions.
2227                  */
2228                 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2229                                              CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2230                 if (ret)
2231                         return ret;
2232         }
2233
2234         ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2235                                         new_state, current_state);
2236         if (ret)
2237                 return ret;
2238
2239         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2240         if (ret)
2241                 return ret;
2242
2243         ceph_osdc_start_request(osdc, req, false);
2244         return 0;
2245 }
2246
2247 static void prune_extents(struct ceph_file_extent *img_extents,
2248                           u32 *num_img_extents, u64 overlap)
2249 {
2250         u32 cnt = *num_img_extents;
2251
2252         /* drop extents completely beyond the overlap */
2253         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2254                 cnt--;
2255
2256         if (cnt) {
2257                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2258
2259                 /* trim final overlapping extent */
2260                 if (ex->fe_off + ex->fe_len > overlap)
2261                         ex->fe_len = overlap - ex->fe_off;
2262         }
2263
2264         *num_img_extents = cnt;
2265 }
2266
2267 /*
2268  * Determine the byte range(s) covered by either just the object extent
2269  * or the entire object in the parent image.
2270  */
2271 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2272                                     bool entire)
2273 {
2274         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2275         int ret;
2276
2277         if (!rbd_dev->parent_overlap)
2278                 return 0;
2279
2280         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2281                                   entire ? 0 : obj_req->ex.oe_off,
2282                                   entire ? rbd_dev->layout.object_size :
2283                                                         obj_req->ex.oe_len,
2284                                   &obj_req->img_extents,
2285                                   &obj_req->num_img_extents);
2286         if (ret)
2287                 return ret;
2288
2289         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2290                       rbd_dev->parent_overlap);
2291         return 0;
2292 }
2293
2294 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2295 {
2296         struct rbd_obj_request *obj_req = osd_req->r_priv;
2297
2298         switch (obj_req->img_request->data_type) {
2299         case OBJ_REQUEST_BIO:
2300                 osd_req_op_extent_osd_data_bio(osd_req, which,
2301                                                &obj_req->bio_pos,
2302                                                obj_req->ex.oe_len);
2303                 break;
2304         case OBJ_REQUEST_BVECS:
2305         case OBJ_REQUEST_OWN_BVECS:
2306                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2307                                                         obj_req->ex.oe_len);
2308                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2309                 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2310                                                     &obj_req->bvec_pos);
2311                 break;
2312         default:
2313                 BUG();
2314         }
2315 }
2316
2317 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2318 {
2319         struct page **pages;
2320
2321         /*
2322          * The response data for a STAT call consists of:
2323          *     le64 length;
2324          *     struct {
2325          *         le32 tv_sec;
2326          *         le32 tv_nsec;
2327          *     } mtime;
2328          */
2329         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2330         if (IS_ERR(pages))
2331                 return PTR_ERR(pages);
2332
2333         osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2334         osd_req_op_raw_data_in_pages(osd_req, which, pages,
2335                                      8 + sizeof(struct ceph_timespec),
2336                                      0, false, true);
2337         return 0;
2338 }
2339
2340 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2341                                 u32 bytes)
2342 {
2343         struct rbd_obj_request *obj_req = osd_req->r_priv;
2344         int ret;
2345
2346         ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2347         if (ret)
2348                 return ret;
2349
2350         osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2351                                           obj_req->copyup_bvec_count, bytes);
2352         return 0;
2353 }
2354
2355 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2356 {
2357         obj_req->read_state = RBD_OBJ_READ_START;
2358         return 0;
2359 }
2360
2361 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2362                                       int which)
2363 {
2364         struct rbd_obj_request *obj_req = osd_req->r_priv;
2365         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2366         u16 opcode;
2367
2368         if (!use_object_map(rbd_dev) ||
2369             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2370                 osd_req_op_alloc_hint_init(osd_req, which++,
2371                                            rbd_dev->layout.object_size,
2372                                            rbd_dev->layout.object_size);
2373         }
2374
2375         if (rbd_obj_is_entire(obj_req))
2376                 opcode = CEPH_OSD_OP_WRITEFULL;
2377         else
2378                 opcode = CEPH_OSD_OP_WRITE;
2379
2380         osd_req_op_extent_init(osd_req, which, opcode,
2381                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2382         rbd_osd_setup_data(osd_req, which);
2383 }
2384
2385 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2386 {
2387         int ret;
2388
2389         /* reverse map the entire object onto the parent */
2390         ret = rbd_obj_calc_img_extents(obj_req, true);
2391         if (ret)
2392                 return ret;
2393
2394         if (rbd_obj_copyup_enabled(obj_req))
2395                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2396
2397         obj_req->write_state = RBD_OBJ_WRITE_START;
2398         return 0;
2399 }
2400
2401 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2402 {
2403         return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2404                                           CEPH_OSD_OP_ZERO;
2405 }
2406
2407 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2408                                         int which)
2409 {
2410         struct rbd_obj_request *obj_req = osd_req->r_priv;
2411
2412         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2413                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2414                 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2415         } else {
2416                 osd_req_op_extent_init(osd_req, which,
2417                                        truncate_or_zero_opcode(obj_req),
2418                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2419                                        0, 0);
2420         }
2421 }
2422
2423 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2424 {
2425         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2426         u64 off, next_off;
2427         int ret;
2428
2429         /*
2430          * Align the range to alloc_size boundary and punt on discards
2431          * that are too small to free up any space.
2432          *
2433          * alloc_size == object_size && is_tail() is a special case for
2434          * filestore with filestore_punch_hole = false, needed to allow
2435          * truncate (in addition to delete).
2436          */
2437         if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2438             !rbd_obj_is_tail(obj_req)) {
2439                 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2440                 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2441                                       rbd_dev->opts->alloc_size);
2442                 if (off >= next_off)
2443                         return 1;
2444
2445                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2446                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2447                      off, next_off - off);
2448                 obj_req->ex.oe_off = off;
2449                 obj_req->ex.oe_len = next_off - off;
2450         }
2451
2452         /* reverse map the entire object onto the parent */
2453         ret = rbd_obj_calc_img_extents(obj_req, true);
2454         if (ret)
2455                 return ret;
2456
2457         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2458         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2459                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2460
2461         obj_req->write_state = RBD_OBJ_WRITE_START;
2462         return 0;
2463 }
2464
2465 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2466                                         int which)
2467 {
2468         struct rbd_obj_request *obj_req = osd_req->r_priv;
2469         u16 opcode;
2470
2471         if (rbd_obj_is_entire(obj_req)) {
2472                 if (obj_req->num_img_extents) {
2473                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2474                                 osd_req_op_init(osd_req, which++,
2475                                                 CEPH_OSD_OP_CREATE, 0);
2476                         opcode = CEPH_OSD_OP_TRUNCATE;
2477                 } else {
2478                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2479                         osd_req_op_init(osd_req, which++,
2480                                         CEPH_OSD_OP_DELETE, 0);
2481                         opcode = 0;
2482                 }
2483         } else {
2484                 opcode = truncate_or_zero_opcode(obj_req);
2485         }
2486
2487         if (opcode)
2488                 osd_req_op_extent_init(osd_req, which, opcode,
2489                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2490                                        0, 0);
2491 }
2492
2493 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2494 {
2495         int ret;
2496
2497         /* reverse map the entire object onto the parent */
2498         ret = rbd_obj_calc_img_extents(obj_req, true);
2499         if (ret)
2500                 return ret;
2501
2502         if (rbd_obj_copyup_enabled(obj_req))
2503                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2504         if (!obj_req->num_img_extents) {
2505                 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2506                 if (rbd_obj_is_entire(obj_req))
2507                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2508         }
2509
2510         obj_req->write_state = RBD_OBJ_WRITE_START;
2511         return 0;
2512 }
2513
2514 static int count_write_ops(struct rbd_obj_request *obj_req)
2515 {
2516         struct rbd_img_request *img_req = obj_req->img_request;
2517
2518         switch (img_req->op_type) {
2519         case OBJ_OP_WRITE:
2520                 if (!use_object_map(img_req->rbd_dev) ||
2521                     !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2522                         return 2; /* setallochint + write/writefull */
2523
2524                 return 1; /* write/writefull */
2525         case OBJ_OP_DISCARD:
2526                 return 1; /* delete/truncate/zero */
2527         case OBJ_OP_ZEROOUT:
2528                 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2529                     !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2530                         return 2; /* create + truncate */
2531
2532                 return 1; /* delete/truncate/zero */
2533         default:
2534                 BUG();
2535         }
2536 }
2537
2538 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2539                                     int which)
2540 {
2541         struct rbd_obj_request *obj_req = osd_req->r_priv;
2542
2543         switch (obj_req->img_request->op_type) {
2544         case OBJ_OP_WRITE:
2545                 __rbd_osd_setup_write_ops(osd_req, which);
2546                 break;
2547         case OBJ_OP_DISCARD:
2548                 __rbd_osd_setup_discard_ops(osd_req, which);
2549                 break;
2550         case OBJ_OP_ZEROOUT:
2551                 __rbd_osd_setup_zeroout_ops(osd_req, which);
2552                 break;
2553         default:
2554                 BUG();
2555         }
2556 }
2557
2558 /*
2559  * Prune the list of object requests (adjust offset and/or length, drop
2560  * redundant requests).  Prepare object request state machines and image
2561  * request state machine for execution.
2562  */
2563 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2564 {
2565         struct rbd_obj_request *obj_req, *next_obj_req;
2566         int ret;
2567
2568         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2569                 switch (img_req->op_type) {
2570                 case OBJ_OP_READ:
2571                         ret = rbd_obj_init_read(obj_req);
2572                         break;
2573                 case OBJ_OP_WRITE:
2574                         ret = rbd_obj_init_write(obj_req);
2575                         break;
2576                 case OBJ_OP_DISCARD:
2577                         ret = rbd_obj_init_discard(obj_req);
2578                         break;
2579                 case OBJ_OP_ZEROOUT:
2580                         ret = rbd_obj_init_zeroout(obj_req);
2581                         break;
2582                 default:
2583                         BUG();
2584                 }
2585                 if (ret < 0)
2586                         return ret;
2587                 if (ret > 0) {
2588                         rbd_img_obj_request_del(img_req, obj_req);
2589                         continue;
2590                 }
2591         }
2592
2593         img_req->state = RBD_IMG_START;
2594         return 0;
2595 }
2596
2597 union rbd_img_fill_iter {
2598         struct ceph_bio_iter    bio_iter;
2599         struct ceph_bvec_iter   bvec_iter;
2600 };
2601
2602 struct rbd_img_fill_ctx {
2603         enum obj_request_type   pos_type;
2604         union rbd_img_fill_iter *pos;
2605         union rbd_img_fill_iter iter;
2606         ceph_object_extent_fn_t set_pos_fn;
2607         ceph_object_extent_fn_t count_fn;
2608         ceph_object_extent_fn_t copy_fn;
2609 };
2610
2611 static struct ceph_object_extent *alloc_object_extent(void *arg)
2612 {
2613         struct rbd_img_request *img_req = arg;
2614         struct rbd_obj_request *obj_req;
2615
2616         obj_req = rbd_obj_request_create();
2617         if (!obj_req)
2618                 return NULL;
2619
2620         rbd_img_obj_request_add(img_req, obj_req);
2621         return &obj_req->ex;
2622 }
2623
2624 /*
2625  * While su != os && sc == 1 is technically not fancy (it's the same
2626  * layout as su == os && sc == 1), we can't use the nocopy path for it
2627  * because ->set_pos_fn() should be called only once per object.
2628  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2629  * treat su != os && sc == 1 as fancy.
2630  */
2631 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2632 {
2633         return l->stripe_unit != l->object_size;
2634 }
2635
2636 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2637                                        struct ceph_file_extent *img_extents,
2638                                        u32 num_img_extents,
2639                                        struct rbd_img_fill_ctx *fctx)
2640 {
2641         u32 i;
2642         int ret;
2643
2644         img_req->data_type = fctx->pos_type;
2645
2646         /*
2647          * Create object requests and set each object request's starting
2648          * position in the provided bio (list) or bio_vec array.
2649          */
2650         fctx->iter = *fctx->pos;
2651         for (i = 0; i < num_img_extents; i++) {
2652                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2653                                            img_extents[i].fe_off,
2654                                            img_extents[i].fe_len,
2655                                            &img_req->object_extents,
2656                                            alloc_object_extent, img_req,
2657                                            fctx->set_pos_fn, &fctx->iter);
2658                 if (ret)
2659                         return ret;
2660         }
2661
2662         return __rbd_img_fill_request(img_req);
2663 }
2664
2665 /*
2666  * Map a list of image extents to a list of object extents, create the
2667  * corresponding object requests (normally each to a different object,
2668  * but not always) and add them to @img_req.  For each object request,
2669  * set up its data descriptor to point to the corresponding chunk(s) of
2670  * @fctx->pos data buffer.
2671  *
2672  * Because ceph_file_to_extents() will merge adjacent object extents
2673  * together, each object request's data descriptor may point to multiple
2674  * different chunks of @fctx->pos data buffer.
2675  *
2676  * @fctx->pos data buffer is assumed to be large enough.
2677  */
2678 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2679                                 struct ceph_file_extent *img_extents,
2680                                 u32 num_img_extents,
2681                                 struct rbd_img_fill_ctx *fctx)
2682 {
2683         struct rbd_device *rbd_dev = img_req->rbd_dev;
2684         struct rbd_obj_request *obj_req;
2685         u32 i;
2686         int ret;
2687
2688         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2689             !rbd_layout_is_fancy(&rbd_dev->layout))
2690                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2691                                                    num_img_extents, fctx);
2692
2693         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2694
2695         /*
2696          * Create object requests and determine ->bvec_count for each object
2697          * request.  Note that ->bvec_count sum over all object requests may
2698          * be greater than the number of bio_vecs in the provided bio (list)
2699          * or bio_vec array because when mapped, those bio_vecs can straddle
2700          * stripe unit boundaries.
2701          */
2702         fctx->iter = *fctx->pos;
2703         for (i = 0; i < num_img_extents; i++) {
2704                 ret = ceph_file_to_extents(&rbd_dev->layout,
2705                                            img_extents[i].fe_off,
2706                                            img_extents[i].fe_len,
2707                                            &img_req->object_extents,
2708                                            alloc_object_extent, img_req,
2709                                            fctx->count_fn, &fctx->iter);
2710                 if (ret)
2711                         return ret;
2712         }
2713
2714         for_each_obj_request(img_req, obj_req) {
2715                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2716                                               sizeof(*obj_req->bvec_pos.bvecs),
2717                                               GFP_NOIO);
2718                 if (!obj_req->bvec_pos.bvecs)
2719                         return -ENOMEM;
2720         }
2721
2722         /*
2723          * Fill in each object request's private bio_vec array, splitting and
2724          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2725          */
2726         fctx->iter = *fctx->pos;
2727         for (i = 0; i < num_img_extents; i++) {
2728                 ret = ceph_iterate_extents(&rbd_dev->layout,
2729                                            img_extents[i].fe_off,
2730                                            img_extents[i].fe_len,
2731                                            &img_req->object_extents,
2732                                            fctx->copy_fn, &fctx->iter);
2733                 if (ret)
2734                         return ret;
2735         }
2736
2737         return __rbd_img_fill_request(img_req);
2738 }
2739
2740 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2741                                u64 off, u64 len)
2742 {
2743         struct ceph_file_extent ex = { off, len };
2744         union rbd_img_fill_iter dummy;
2745         struct rbd_img_fill_ctx fctx = {
2746                 .pos_type = OBJ_REQUEST_NODATA,
2747                 .pos = &dummy,
2748         };
2749
2750         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2751 }
2752
2753 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2754 {
2755         struct rbd_obj_request *obj_req =
2756             container_of(ex, struct rbd_obj_request, ex);
2757         struct ceph_bio_iter *it = arg;
2758
2759         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2760         obj_req->bio_pos = *it;
2761         ceph_bio_iter_advance(it, bytes);
2762 }
2763
2764 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2765 {
2766         struct rbd_obj_request *obj_req =
2767             container_of(ex, struct rbd_obj_request, ex);
2768         struct ceph_bio_iter *it = arg;
2769
2770         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2771         ceph_bio_iter_advance_step(it, bytes, ({
2772                 obj_req->bvec_count++;
2773         }));
2774
2775 }
2776
2777 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2778 {
2779         struct rbd_obj_request *obj_req =
2780             container_of(ex, struct rbd_obj_request, ex);
2781         struct ceph_bio_iter *it = arg;
2782
2783         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2784         ceph_bio_iter_advance_step(it, bytes, ({
2785                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2786                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2787         }));
2788 }
2789
2790 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2791                                    struct ceph_file_extent *img_extents,
2792                                    u32 num_img_extents,
2793                                    struct ceph_bio_iter *bio_pos)
2794 {
2795         struct rbd_img_fill_ctx fctx = {
2796                 .pos_type = OBJ_REQUEST_BIO,
2797                 .pos = (union rbd_img_fill_iter *)bio_pos,
2798                 .set_pos_fn = set_bio_pos,
2799                 .count_fn = count_bio_bvecs,
2800                 .copy_fn = copy_bio_bvecs,
2801         };
2802
2803         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2804                                     &fctx);
2805 }
2806
2807 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2808                                  u64 off, u64 len, struct bio *bio)
2809 {
2810         struct ceph_file_extent ex = { off, len };
2811         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2812
2813         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2814 }
2815
2816 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2817 {
2818         struct rbd_obj_request *obj_req =
2819             container_of(ex, struct rbd_obj_request, ex);
2820         struct ceph_bvec_iter *it = arg;
2821
2822         obj_req->bvec_pos = *it;
2823         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2824         ceph_bvec_iter_advance(it, bytes);
2825 }
2826
2827 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2828 {
2829         struct rbd_obj_request *obj_req =
2830             container_of(ex, struct rbd_obj_request, ex);
2831         struct ceph_bvec_iter *it = arg;
2832
2833         ceph_bvec_iter_advance_step(it, bytes, ({
2834                 obj_req->bvec_count++;
2835         }));
2836 }
2837
2838 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2839 {
2840         struct rbd_obj_request *obj_req =
2841             container_of(ex, struct rbd_obj_request, ex);
2842         struct ceph_bvec_iter *it = arg;
2843
2844         ceph_bvec_iter_advance_step(it, bytes, ({
2845                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2846                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2847         }));
2848 }
2849
2850 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2851                                      struct ceph_file_extent *img_extents,
2852                                      u32 num_img_extents,
2853                                      struct ceph_bvec_iter *bvec_pos)
2854 {
2855         struct rbd_img_fill_ctx fctx = {
2856                 .pos_type = OBJ_REQUEST_BVECS,
2857                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2858                 .set_pos_fn = set_bvec_pos,
2859                 .count_fn = count_bvecs,
2860                 .copy_fn = copy_bvecs,
2861         };
2862
2863         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2864                                     &fctx);
2865 }
2866
2867 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2868                                    struct ceph_file_extent *img_extents,
2869                                    u32 num_img_extents,
2870                                    struct bio_vec *bvecs)
2871 {
2872         struct ceph_bvec_iter it = {
2873                 .bvecs = bvecs,
2874                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2875                                                              num_img_extents) },
2876         };
2877
2878         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2879                                          &it);
2880 }
2881
2882 static void rbd_img_handle_request_work(struct work_struct *work)
2883 {
2884         struct rbd_img_request *img_req =
2885             container_of(work, struct rbd_img_request, work);
2886
2887         rbd_img_handle_request(img_req, img_req->work_result);
2888 }
2889
2890 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2891 {
2892         INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2893         img_req->work_result = result;
2894         queue_work(rbd_wq, &img_req->work);
2895 }
2896
2897 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2898 {
2899         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2900
2901         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2902                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2903                 return true;
2904         }
2905
2906         dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2907              obj_req->ex.oe_objno);
2908         return false;
2909 }
2910
2911 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2912 {
2913         struct ceph_osd_request *osd_req;
2914         int ret;
2915
2916         osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2917         if (IS_ERR(osd_req))
2918                 return PTR_ERR(osd_req);
2919
2920         osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2921                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2922         rbd_osd_setup_data(osd_req, 0);
2923         rbd_osd_format_read(osd_req);
2924
2925         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2926         if (ret)
2927                 return ret;
2928
2929         rbd_osd_submit(osd_req);
2930         return 0;
2931 }
2932
2933 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2934 {
2935         struct rbd_img_request *img_req = obj_req->img_request;
2936         struct rbd_img_request *child_img_req;
2937         int ret;
2938
2939         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2940                                                OBJ_OP_READ, NULL);
2941         if (!child_img_req)
2942                 return -ENOMEM;
2943
2944         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2945         child_img_req->obj_request = obj_req;
2946
2947         if (!rbd_img_is_write(img_req)) {
2948                 switch (img_req->data_type) {
2949                 case OBJ_REQUEST_BIO:
2950                         ret = __rbd_img_fill_from_bio(child_img_req,
2951                                                       obj_req->img_extents,
2952                                                       obj_req->num_img_extents,
2953                                                       &obj_req->bio_pos);
2954                         break;
2955                 case OBJ_REQUEST_BVECS:
2956                 case OBJ_REQUEST_OWN_BVECS:
2957                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2958                                                       obj_req->img_extents,
2959                                                       obj_req->num_img_extents,
2960                                                       &obj_req->bvec_pos);
2961                         break;
2962                 default:
2963                         BUG();
2964                 }
2965         } else {
2966                 ret = rbd_img_fill_from_bvecs(child_img_req,
2967                                               obj_req->img_extents,
2968                                               obj_req->num_img_extents,
2969                                               obj_req->copyup_bvecs);
2970         }
2971         if (ret) {
2972                 rbd_img_request_put(child_img_req);
2973                 return ret;
2974         }
2975
2976         /* avoid parent chain recursion */
2977         rbd_img_schedule(child_img_req, 0);
2978         return 0;
2979 }
2980
2981 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2982 {
2983         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2984         int ret;
2985
2986 again:
2987         switch (obj_req->read_state) {
2988         case RBD_OBJ_READ_START:
2989                 rbd_assert(!*result);
2990
2991                 if (!rbd_obj_may_exist(obj_req)) {
2992                         *result = -ENOENT;
2993                         obj_req->read_state = RBD_OBJ_READ_OBJECT;
2994                         goto again;
2995                 }
2996
2997                 ret = rbd_obj_read_object(obj_req);
2998                 if (ret) {
2999                         *result = ret;
3000                         return true;
3001                 }
3002                 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3003                 return false;
3004         case RBD_OBJ_READ_OBJECT:
3005                 if (*result == -ENOENT && rbd_dev->parent_overlap) {
3006                         /* reverse map this object extent onto the parent */
3007                         ret = rbd_obj_calc_img_extents(obj_req, false);
3008                         if (ret) {
3009                                 *result = ret;
3010                                 return true;
3011                         }
3012                         if (obj_req->num_img_extents) {
3013                                 ret = rbd_obj_read_from_parent(obj_req);
3014                                 if (ret) {
3015                                         *result = ret;
3016                                         return true;
3017                                 }
3018                                 obj_req->read_state = RBD_OBJ_READ_PARENT;
3019                                 return false;
3020                         }
3021                 }
3022
3023                 /*
3024                  * -ENOENT means a hole in the image -- zero-fill the entire
3025                  * length of the request.  A short read also implies zero-fill
3026                  * to the end of the request.
3027                  */
3028                 if (*result == -ENOENT) {
3029                         rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
3030                         *result = 0;
3031                 } else if (*result >= 0) {
3032                         if (*result < obj_req->ex.oe_len)
3033                                 rbd_obj_zero_range(obj_req, *result,
3034                                                 obj_req->ex.oe_len - *result);
3035                         else
3036                                 rbd_assert(*result == obj_req->ex.oe_len);
3037                         *result = 0;
3038                 }
3039                 return true;
3040         case RBD_OBJ_READ_PARENT:
3041                 return true;
3042         default:
3043                 BUG();
3044         }
3045 }
3046
3047 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3048 {
3049         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3050
3051         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3052                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3053
3054         if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3055             (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3056                 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3057                 return true;
3058         }
3059
3060         return false;
3061 }
3062
3063 /*
3064  * Return:
3065  *   0 - object map update sent
3066  *   1 - object map update isn't needed
3067  *  <0 - error
3068  */
3069 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3070 {
3071         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3072         u8 new_state;
3073
3074         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3075                 return 1;
3076
3077         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3078                 new_state = OBJECT_PENDING;
3079         else
3080                 new_state = OBJECT_EXISTS;
3081
3082         return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3083 }
3084
3085 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3086 {
3087         struct ceph_osd_request *osd_req;
3088         int num_ops = count_write_ops(obj_req);
3089         int which = 0;
3090         int ret;
3091
3092         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3093                 num_ops++; /* stat */
3094
3095         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3096         if (IS_ERR(osd_req))
3097                 return PTR_ERR(osd_req);
3098
3099         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3100                 ret = rbd_osd_setup_stat(osd_req, which++);
3101                 if (ret)
3102                         return ret;
3103         }
3104
3105         rbd_osd_setup_write_ops(osd_req, which);
3106         rbd_osd_format_write(osd_req);
3107
3108         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3109         if (ret)
3110                 return ret;
3111
3112         rbd_osd_submit(osd_req);
3113         return 0;
3114 }
3115
3116 /*
3117  * copyup_bvecs pages are never highmem pages
3118  */
3119 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3120 {
3121         struct ceph_bvec_iter it = {
3122                 .bvecs = bvecs,
3123                 .iter = { .bi_size = bytes },
3124         };
3125
3126         ceph_bvec_iter_advance_step(&it, bytes, ({
3127                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3128                                bv.bv_len))
3129                         return false;
3130         }));
3131         return true;
3132 }
3133
3134 #define MODS_ONLY       U32_MAX
3135
3136 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3137                                       u32 bytes)
3138 {
3139         struct ceph_osd_request *osd_req;
3140         int ret;
3141
3142         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3143         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3144
3145         osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3146         if (IS_ERR(osd_req))
3147                 return PTR_ERR(osd_req);
3148
3149         ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3150         if (ret)
3151                 return ret;
3152
3153         rbd_osd_format_write(osd_req);
3154
3155         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3156         if (ret)
3157                 return ret;
3158
3159         rbd_osd_submit(osd_req);
3160         return 0;
3161 }
3162
3163 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3164                                         u32 bytes)
3165 {
3166         struct ceph_osd_request *osd_req;
3167         int num_ops = count_write_ops(obj_req);
3168         int which = 0;
3169         int ret;
3170
3171         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3172
3173         if (bytes != MODS_ONLY)
3174                 num_ops++; /* copyup */
3175
3176         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3177         if (IS_ERR(osd_req))
3178                 return PTR_ERR(osd_req);
3179
3180         if (bytes != MODS_ONLY) {
3181                 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3182                 if (ret)
3183                         return ret;
3184         }
3185
3186         rbd_osd_setup_write_ops(osd_req, which);
3187         rbd_osd_format_write(osd_req);
3188
3189         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3190         if (ret)
3191                 return ret;
3192
3193         rbd_osd_submit(osd_req);
3194         return 0;
3195 }
3196
3197 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3198 {
3199         u32 i;
3200
3201         rbd_assert(!obj_req->copyup_bvecs);
3202         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3203         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3204                                         sizeof(*obj_req->copyup_bvecs),
3205                                         GFP_NOIO);
3206         if (!obj_req->copyup_bvecs)
3207                 return -ENOMEM;
3208
3209         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3210                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3211
3212                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3213                 if (!obj_req->copyup_bvecs[i].bv_page)
3214                         return -ENOMEM;
3215
3216                 obj_req->copyup_bvecs[i].bv_offset = 0;
3217                 obj_req->copyup_bvecs[i].bv_len = len;
3218                 obj_overlap -= len;
3219         }
3220
3221         rbd_assert(!obj_overlap);
3222         return 0;
3223 }
3224
3225 /*
3226  * The target object doesn't exist.  Read the data for the entire
3227  * target object up to the overlap point (if any) from the parent,
3228  * so we can use it for a copyup.
3229  */
3230 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3231 {
3232         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3233         int ret;
3234
3235         rbd_assert(obj_req->num_img_extents);
3236         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3237                       rbd_dev->parent_overlap);
3238         if (!obj_req->num_img_extents) {
3239                 /*
3240                  * The overlap has become 0 (most likely because the
3241                  * image has been flattened).  Re-submit the original write
3242                  * request -- pass MODS_ONLY since the copyup isn't needed
3243                  * anymore.
3244                  */
3245                 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3246         }
3247
3248         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3249         if (ret)
3250                 return ret;
3251
3252         return rbd_obj_read_from_parent(obj_req);
3253 }
3254
3255 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3256 {
3257         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3258         struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3259         u8 new_state;
3260         u32 i;
3261         int ret;
3262
3263         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3264
3265         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3266                 return;
3267
3268         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3269                 return;
3270
3271         for (i = 0; i < snapc->num_snaps; i++) {
3272                 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3273                     i + 1 < snapc->num_snaps)
3274                         new_state = OBJECT_EXISTS_CLEAN;
3275                 else
3276                         new_state = OBJECT_EXISTS;
3277
3278                 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3279                                             new_state, NULL);
3280                 if (ret < 0) {
3281                         obj_req->pending.result = ret;
3282                         return;
3283                 }
3284
3285                 rbd_assert(!ret);
3286                 obj_req->pending.num_pending++;
3287         }
3288 }
3289
3290 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3291 {
3292         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3293         int ret;
3294
3295         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3296
3297         /*
3298          * Only send non-zero copyup data to save some I/O and network
3299          * bandwidth -- zero copyup data is equivalent to the object not
3300          * existing.
3301          */
3302         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3303                 bytes = 0;
3304
3305         if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3306                 /*
3307                  * Send a copyup request with an empty snapshot context to
3308                  * deep-copyup the object through all existing snapshots.
3309                  * A second request with the current snapshot context will be
3310                  * sent for the actual modification.
3311                  */
3312                 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3313                 if (ret) {
3314                         obj_req->pending.result = ret;
3315                         return;
3316                 }
3317
3318                 obj_req->pending.num_pending++;
3319                 bytes = MODS_ONLY;
3320         }
3321
3322         ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3323         if (ret) {
3324                 obj_req->pending.result = ret;
3325                 return;
3326         }
3327
3328         obj_req->pending.num_pending++;
3329 }
3330
3331 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3332 {
3333         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3334         int ret;
3335
3336 again:
3337         switch (obj_req->copyup_state) {
3338         case RBD_OBJ_COPYUP_START:
3339                 rbd_assert(!*result);
3340
3341                 ret = rbd_obj_copyup_read_parent(obj_req);
3342                 if (ret) {
3343                         *result = ret;
3344                         return true;
3345                 }
3346                 if (obj_req->num_img_extents)
3347                         obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3348                 else
3349                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3350                 return false;
3351         case RBD_OBJ_COPYUP_READ_PARENT:
3352                 if (*result)
3353                         return true;
3354
3355                 if (is_zero_bvecs(obj_req->copyup_bvecs,
3356                                   rbd_obj_img_extents_bytes(obj_req))) {
3357                         dout("%s %p detected zeros\n", __func__, obj_req);
3358                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3359                 }
3360
3361                 rbd_obj_copyup_object_maps(obj_req);
3362                 if (!obj_req->pending.num_pending) {
3363                         *result = obj_req->pending.result;
3364                         obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3365                         goto again;
3366                 }
3367                 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3368                 return false;
3369         case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3370                 if (!pending_result_dec(&obj_req->pending, result))
3371                         return false;
3372                 /* fall through */
3373         case RBD_OBJ_COPYUP_OBJECT_MAPS:
3374                 if (*result) {
3375                         rbd_warn(rbd_dev, "snap object map update failed: %d",
3376                                  *result);
3377                         return true;
3378                 }
3379
3380                 rbd_obj_copyup_write_object(obj_req);
3381                 if (!obj_req->pending.num_pending) {
3382                         *result = obj_req->pending.result;
3383                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3384                         goto again;
3385                 }
3386                 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3387                 return false;
3388         case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3389                 if (!pending_result_dec(&obj_req->pending, result))
3390                         return false;
3391                 /* fall through */
3392         case RBD_OBJ_COPYUP_WRITE_OBJECT:
3393                 return true;
3394         default:
3395                 BUG();
3396         }
3397 }
3398
3399 /*
3400  * Return:
3401  *   0 - object map update sent
3402  *   1 - object map update isn't needed
3403  *  <0 - error
3404  */
3405 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3406 {
3407         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3408         u8 current_state = OBJECT_PENDING;
3409
3410         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3411                 return 1;
3412
3413         if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3414                 return 1;
3415
3416         return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3417                                      &current_state);
3418 }
3419
3420 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3421 {
3422         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3423         int ret;
3424
3425 again:
3426         switch (obj_req->write_state) {
3427         case RBD_OBJ_WRITE_START:
3428                 rbd_assert(!*result);
3429
3430                 if (rbd_obj_write_is_noop(obj_req))
3431                         return true;
3432
3433                 ret = rbd_obj_write_pre_object_map(obj_req);
3434                 if (ret < 0) {
3435                         *result = ret;
3436                         return true;
3437                 }
3438                 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3439                 if (ret > 0)
3440                         goto again;
3441                 return false;
3442         case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3443                 if (*result) {
3444                         rbd_warn(rbd_dev, "pre object map update failed: %d",
3445                                  *result);
3446                         return true;
3447                 }
3448                 ret = rbd_obj_write_object(obj_req);
3449                 if (ret) {
3450                         *result = ret;
3451                         return true;
3452                 }
3453                 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3454                 return false;
3455         case RBD_OBJ_WRITE_OBJECT:
3456                 if (*result == -ENOENT) {
3457                         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3458                                 *result = 0;
3459                                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3460                                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3461                                 goto again;
3462                         }
3463                         /*
3464                          * On a non-existent object:
3465                          *   delete - -ENOENT, truncate/zero - 0
3466                          */
3467                         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3468                                 *result = 0;
3469                 }
3470                 if (*result)
3471                         return true;
3472
3473                 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3474                 goto again;
3475         case __RBD_OBJ_WRITE_COPYUP:
3476                 if (!rbd_obj_advance_copyup(obj_req, result))
3477                         return false;
3478                 /* fall through */
3479         case RBD_OBJ_WRITE_COPYUP:
3480                 if (*result) {
3481                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
3482                         return true;
3483                 }
3484                 ret = rbd_obj_write_post_object_map(obj_req);
3485                 if (ret < 0) {
3486                         *result = ret;
3487                         return true;
3488                 }
3489                 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3490                 if (ret > 0)
3491                         goto again;
3492                 return false;
3493         case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3494                 if (*result)
3495                         rbd_warn(rbd_dev, "post object map update failed: %d",
3496                                  *result);
3497                 return true;
3498         default:
3499                 BUG();
3500         }
3501 }
3502
3503 /*
3504  * Return true if @obj_req is completed.
3505  */
3506 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3507                                      int *result)
3508 {
3509         struct rbd_img_request *img_req = obj_req->img_request;
3510         struct rbd_device *rbd_dev = img_req->rbd_dev;
3511         bool done;
3512
3513         mutex_lock(&obj_req->state_mutex);
3514         if (!rbd_img_is_write(img_req))
3515                 done = rbd_obj_advance_read(obj_req, result);
3516         else
3517                 done = rbd_obj_advance_write(obj_req, result);
3518         mutex_unlock(&obj_req->state_mutex);
3519
3520         if (done && *result) {
3521                 rbd_assert(*result < 0);
3522                 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3523                          obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3524                          obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3525         }
3526         return done;
3527 }
3528
3529 /*
3530  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3531  * recursion.
3532  */
3533 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3534 {
3535         if (__rbd_obj_handle_request(obj_req, &result))
3536                 rbd_img_handle_request(obj_req->img_request, result);
3537 }
3538
3539 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3540 {
3541         struct rbd_device *rbd_dev = img_req->rbd_dev;
3542
3543         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3544                 return false;
3545
3546         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3547                 return false;
3548
3549         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3550         if (rbd_dev->opts->lock_on_read ||
3551             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3552                 return true;
3553
3554         return rbd_img_is_write(img_req);
3555 }
3556
3557 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3558 {
3559         struct rbd_device *rbd_dev = img_req->rbd_dev;
3560         bool locked;
3561
3562         lockdep_assert_held(&rbd_dev->lock_rwsem);
3563         locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3564         spin_lock(&rbd_dev->lock_lists_lock);
3565         rbd_assert(list_empty(&img_req->lock_item));
3566         if (!locked)
3567                 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3568         else
3569                 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3570         spin_unlock(&rbd_dev->lock_lists_lock);
3571         return locked;
3572 }
3573
3574 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3575 {
3576         struct rbd_device *rbd_dev = img_req->rbd_dev;
3577         bool need_wakeup;
3578
3579         lockdep_assert_held(&rbd_dev->lock_rwsem);
3580         spin_lock(&rbd_dev->lock_lists_lock);
3581         rbd_assert(!list_empty(&img_req->lock_item));
3582         list_del_init(&img_req->lock_item);
3583         need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3584                        list_empty(&rbd_dev->running_list));
3585         spin_unlock(&rbd_dev->lock_lists_lock);
3586         if (need_wakeup)
3587                 complete(&rbd_dev->releasing_wait);
3588 }
3589
3590 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3591 {
3592         struct rbd_device *rbd_dev = img_req->rbd_dev;
3593
3594         if (!need_exclusive_lock(img_req))
3595                 return 1;
3596
3597         if (rbd_lock_add_request(img_req))
3598                 return 1;
3599
3600         if (rbd_dev->opts->exclusive) {
3601                 WARN_ON(1); /* lock got released? */
3602                 return -EROFS;
3603         }
3604
3605         /*
3606          * Note the use of mod_delayed_work() in rbd_acquire_lock()
3607          * and cancel_delayed_work() in wake_lock_waiters().
3608          */
3609         dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3610         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3611         return 0;
3612 }
3613
3614 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3615 {
3616         struct rbd_obj_request *obj_req;
3617
3618         rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3619
3620         for_each_obj_request(img_req, obj_req) {
3621                 int result = 0;
3622
3623                 if (__rbd_obj_handle_request(obj_req, &result)) {
3624                         if (result) {
3625                                 img_req->pending.result = result;
3626                                 return;
3627                         }
3628                 } else {
3629                         img_req->pending.num_pending++;
3630                 }
3631         }
3632 }
3633
3634 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3635 {
3636         struct rbd_device *rbd_dev = img_req->rbd_dev;
3637         int ret;
3638
3639 again:
3640         switch (img_req->state) {
3641         case RBD_IMG_START:
3642                 rbd_assert(!*result);
3643
3644                 ret = rbd_img_exclusive_lock(img_req);
3645                 if (ret < 0) {
3646                         *result = ret;
3647                         return true;
3648                 }
3649                 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3650                 if (ret > 0)
3651                         goto again;
3652                 return false;
3653         case RBD_IMG_EXCLUSIVE_LOCK:
3654                 if (*result)
3655                         return true;
3656
3657                 rbd_assert(!need_exclusive_lock(img_req) ||
3658                            __rbd_is_lock_owner(rbd_dev));
3659
3660                 rbd_img_object_requests(img_req);
3661                 if (!img_req->pending.num_pending) {
3662                         *result = img_req->pending.result;
3663                         img_req->state = RBD_IMG_OBJECT_REQUESTS;
3664                         goto again;
3665                 }
3666                 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3667                 return false;
3668         case __RBD_IMG_OBJECT_REQUESTS:
3669                 if (!pending_result_dec(&img_req->pending, result))
3670                         return false;
3671                 /* fall through */
3672         case RBD_IMG_OBJECT_REQUESTS:
3673                 return true;
3674         default:
3675                 BUG();
3676         }
3677 }
3678
3679 /*
3680  * Return true if @img_req is completed.
3681  */
3682 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3683                                      int *result)
3684 {
3685         struct rbd_device *rbd_dev = img_req->rbd_dev;
3686         bool done;
3687
3688         if (need_exclusive_lock(img_req)) {
3689                 down_read(&rbd_dev->lock_rwsem);
3690                 mutex_lock(&img_req->state_mutex);
3691                 done = rbd_img_advance(img_req, result);
3692                 if (done)
3693                         rbd_lock_del_request(img_req);
3694                 mutex_unlock(&img_req->state_mutex);
3695                 up_read(&rbd_dev->lock_rwsem);
3696         } else {
3697                 mutex_lock(&img_req->state_mutex);
3698                 done = rbd_img_advance(img_req, result);
3699                 mutex_unlock(&img_req->state_mutex);
3700         }
3701
3702         if (done && *result) {
3703                 rbd_assert(*result < 0);
3704                 rbd_warn(rbd_dev, "%s%s result %d",
3705                       test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3706                       obj_op_name(img_req->op_type), *result);
3707         }
3708         return done;
3709 }
3710
3711 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3712 {
3713 again:
3714         if (!__rbd_img_handle_request(img_req, &result))
3715                 return;
3716
3717         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3718                 struct rbd_obj_request *obj_req = img_req->obj_request;
3719
3720                 rbd_img_request_put(img_req);
3721                 if (__rbd_obj_handle_request(obj_req, &result)) {
3722                         img_req = obj_req->img_request;
3723                         goto again;
3724                 }
3725         } else {
3726                 struct request *rq = img_req->rq;
3727
3728                 rbd_img_request_put(img_req);
3729                 blk_mq_end_request(rq, errno_to_blk_status(result));
3730         }
3731 }
3732
3733 static const struct rbd_client_id rbd_empty_cid;
3734
3735 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3736                           const struct rbd_client_id *rhs)
3737 {
3738         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3739 }
3740
3741 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3742 {
3743         struct rbd_client_id cid;
3744
3745         mutex_lock(&rbd_dev->watch_mutex);
3746         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3747         cid.handle = rbd_dev->watch_cookie;
3748         mutex_unlock(&rbd_dev->watch_mutex);
3749         return cid;
3750 }
3751
3752 /*
3753  * lock_rwsem must be held for write
3754  */
3755 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3756                               const struct rbd_client_id *cid)
3757 {
3758         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3759              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3760              cid->gid, cid->handle);
3761         rbd_dev->owner_cid = *cid; /* struct */
3762 }
3763
3764 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3765 {
3766         mutex_lock(&rbd_dev->watch_mutex);
3767         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3768         mutex_unlock(&rbd_dev->watch_mutex);
3769 }
3770
3771 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3772 {
3773         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3774
3775         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3776         strcpy(rbd_dev->lock_cookie, cookie);
3777         rbd_set_owner_cid(rbd_dev, &cid);
3778         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3779 }
3780
3781 /*
3782  * lock_rwsem must be held for write
3783  */
3784 static int rbd_lock(struct rbd_device *rbd_dev)
3785 {
3786         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3787         char cookie[32];
3788         int ret;
3789
3790         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3791                 rbd_dev->lock_cookie[0] != '\0');
3792
3793         format_lock_cookie(rbd_dev, cookie);
3794         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3795                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3796                             RBD_LOCK_TAG, "", 0);
3797         if (ret)
3798                 return ret;
3799
3800         __rbd_lock(rbd_dev, cookie);
3801         return 0;
3802 }
3803
3804 /*
3805  * lock_rwsem must be held for write
3806  */
3807 static void rbd_unlock(struct rbd_device *rbd_dev)
3808 {
3809         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3810         int ret;
3811
3812         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3813                 rbd_dev->lock_cookie[0] == '\0');
3814
3815         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3816                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3817         if (ret && ret != -ENOENT)
3818                 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3819
3820         /* treat errors as the image is unlocked */
3821         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3822         rbd_dev->lock_cookie[0] = '\0';
3823         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3824         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3825 }
3826
3827 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3828                                 enum rbd_notify_op notify_op,
3829                                 struct page ***preply_pages,
3830                                 size_t *preply_len)
3831 {
3832         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3833         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3834         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3835         int buf_size = sizeof(buf);
3836         void *p = buf;
3837
3838         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3839
3840         /* encode *LockPayload NotifyMessage (op + ClientId) */
3841         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3842         ceph_encode_32(&p, notify_op);
3843         ceph_encode_64(&p, cid.gid);
3844         ceph_encode_64(&p, cid.handle);
3845
3846         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3847                                 &rbd_dev->header_oloc, buf, buf_size,
3848                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3849 }
3850
3851 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3852                                enum rbd_notify_op notify_op)
3853 {
3854         struct page **reply_pages;
3855         size_t reply_len;
3856
3857         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3858         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3859 }
3860
3861 static void rbd_notify_acquired_lock(struct work_struct *work)
3862 {
3863         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3864                                                   acquired_lock_work);
3865
3866         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3867 }
3868
3869 static void rbd_notify_released_lock(struct work_struct *work)
3870 {
3871         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3872                                                   released_lock_work);
3873
3874         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3875 }
3876
3877 static int rbd_request_lock(struct rbd_device *rbd_dev)
3878 {
3879         struct page **reply_pages;
3880         size_t reply_len;
3881         bool lock_owner_responded = false;
3882         int ret;
3883
3884         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3885
3886         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3887                                    &reply_pages, &reply_len);
3888         if (ret && ret != -ETIMEDOUT) {
3889                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3890                 goto out;
3891         }
3892
3893         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3894                 void *p = page_address(reply_pages[0]);
3895                 void *const end = p + reply_len;
3896                 u32 n;
3897
3898                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3899                 while (n--) {
3900                         u8 struct_v;
3901                         u32 len;
3902
3903                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3904                         p += 8 + 8; /* skip gid and cookie */
3905
3906                         ceph_decode_32_safe(&p, end, len, e_inval);
3907                         if (!len)
3908                                 continue;
3909
3910                         if (lock_owner_responded) {
3911                                 rbd_warn(rbd_dev,
3912                                          "duplicate lock owners detected");
3913                                 ret = -EIO;
3914                                 goto out;
3915                         }
3916
3917                         lock_owner_responded = true;
3918                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3919                                                   &struct_v, &len);
3920                         if (ret) {
3921                                 rbd_warn(rbd_dev,
3922                                          "failed to decode ResponseMessage: %d",
3923                                          ret);
3924                                 goto e_inval;
3925                         }
3926
3927                         ret = ceph_decode_32(&p);
3928                 }
3929         }
3930
3931         if (!lock_owner_responded) {
3932                 rbd_warn(rbd_dev, "no lock owners detected");
3933                 ret = -ETIMEDOUT;
3934         }
3935
3936 out:
3937         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3938         return ret;
3939
3940 e_inval:
3941         ret = -EINVAL;
3942         goto out;
3943 }
3944
3945 /*
3946  * Either image request state machine(s) or rbd_add_acquire_lock()
3947  * (i.e. "rbd map").
3948  */
3949 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3950 {
3951         struct rbd_img_request *img_req;
3952
3953         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3954         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3955
3956         cancel_delayed_work(&rbd_dev->lock_dwork);
3957         if (!completion_done(&rbd_dev->acquire_wait)) {
3958                 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3959                            list_empty(&rbd_dev->running_list));
3960                 rbd_dev->acquire_err = result;
3961                 complete_all(&rbd_dev->acquire_wait);
3962                 return;
3963         }
3964
3965         list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3966                 mutex_lock(&img_req->state_mutex);
3967                 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3968                 rbd_img_schedule(img_req, result);
3969                 mutex_unlock(&img_req->state_mutex);
3970         }
3971
3972         list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3973 }
3974
3975 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3976                                struct ceph_locker **lockers, u32 *num_lockers)
3977 {
3978         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3979         u8 lock_type;
3980         char *lock_tag;
3981         int ret;
3982
3983         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3984
3985         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3986                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3987                                  &lock_type, &lock_tag, lockers, num_lockers);
3988         if (ret)
3989                 return ret;
3990
3991         if (*num_lockers == 0) {
3992                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3993                 goto out;
3994         }
3995
3996         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3997                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3998                          lock_tag);
3999                 ret = -EBUSY;
4000                 goto out;
4001         }
4002
4003         if (lock_type == CEPH_CLS_LOCK_SHARED) {
4004                 rbd_warn(rbd_dev, "shared lock type detected");
4005                 ret = -EBUSY;
4006                 goto out;
4007         }
4008
4009         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
4010                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
4011                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
4012                          (*lockers)[0].id.cookie);
4013                 ret = -EBUSY;
4014                 goto out;
4015         }
4016
4017 out:
4018         kfree(lock_tag);
4019         return ret;
4020 }
4021
4022 static int find_watcher(struct rbd_device *rbd_dev,
4023                         const struct ceph_locker *locker)
4024 {
4025         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4026         struct ceph_watch_item *watchers;
4027         u32 num_watchers;
4028         u64 cookie;
4029         int i;
4030         int ret;
4031
4032         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4033                                       &rbd_dev->header_oloc, &watchers,
4034                                       &num_watchers);
4035         if (ret)
4036                 return ret;
4037
4038         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4039         for (i = 0; i < num_watchers; i++) {
4040                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
4041                             sizeof(locker->info.addr)) &&
4042                     watchers[i].cookie == cookie) {
4043                         struct rbd_client_id cid = {
4044                                 .gid = le64_to_cpu(watchers[i].name.num),
4045                                 .handle = cookie,
4046                         };
4047
4048                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4049                              rbd_dev, cid.gid, cid.handle);
4050                         rbd_set_owner_cid(rbd_dev, &cid);
4051                         ret = 1;
4052                         goto out;
4053                 }
4054         }
4055
4056         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4057         ret = 0;
4058 out:
4059         kfree(watchers);
4060         return ret;
4061 }
4062
4063 /*
4064  * lock_rwsem must be held for write
4065  */
4066 static int rbd_try_lock(struct rbd_device *rbd_dev)
4067 {
4068         struct ceph_client *client = rbd_dev->rbd_client->client;
4069         struct ceph_locker *lockers;
4070         u32 num_lockers;
4071         int ret;
4072
4073         for (;;) {
4074                 ret = rbd_lock(rbd_dev);
4075                 if (ret != -EBUSY)
4076                         return ret;
4077
4078                 /* determine if the current lock holder is still alive */
4079                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4080                 if (ret)
4081                         return ret;
4082
4083                 if (num_lockers == 0)
4084                         goto again;
4085
4086                 ret = find_watcher(rbd_dev, lockers);
4087                 if (ret)
4088                         goto out; /* request lock or error */
4089
4090                 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4091                          ENTITY_NAME(lockers[0].id.name));
4092
4093                 ret = ceph_monc_blacklist_add(&client->monc,
4094                                               &lockers[0].info.addr);
4095                 if (ret) {
4096                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4097                                  ENTITY_NAME(lockers[0].id.name), ret);
4098                         goto out;
4099                 }
4100
4101                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4102                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
4103                                           lockers[0].id.cookie,
4104                                           &lockers[0].id.name);
4105                 if (ret && ret != -ENOENT)
4106                         goto out;
4107
4108 again:
4109                 ceph_free_lockers(lockers, num_lockers);
4110         }
4111
4112 out:
4113         ceph_free_lockers(lockers, num_lockers);
4114         return ret;
4115 }
4116
4117 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4118 {
4119         int ret;
4120
4121         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4122                 ret = rbd_object_map_open(rbd_dev);
4123                 if (ret)
4124                         return ret;
4125         }
4126
4127         return 0;
4128 }
4129
4130 /*
4131  * Return:
4132  *   0 - lock acquired
4133  *   1 - caller should call rbd_request_lock()
4134  *  <0 - error
4135  */
4136 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4137 {
4138         int ret;
4139
4140         down_read(&rbd_dev->lock_rwsem);
4141         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4142              rbd_dev->lock_state);
4143         if (__rbd_is_lock_owner(rbd_dev)) {
4144                 up_read(&rbd_dev->lock_rwsem);
4145                 return 0;
4146         }
4147
4148         up_read(&rbd_dev->lock_rwsem);
4149         down_write(&rbd_dev->lock_rwsem);
4150         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4151              rbd_dev->lock_state);
4152         if (__rbd_is_lock_owner(rbd_dev)) {
4153                 up_write(&rbd_dev->lock_rwsem);
4154                 return 0;
4155         }
4156
4157         ret = rbd_try_lock(rbd_dev);
4158         if (ret < 0) {
4159                 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4160                 if (ret == -EBLACKLISTED)
4161                         goto out;
4162
4163                 ret = 1; /* request lock anyway */
4164         }
4165         if (ret > 0) {
4166                 up_write(&rbd_dev->lock_rwsem);
4167                 return ret;
4168         }
4169
4170         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4171         rbd_assert(list_empty(&rbd_dev->running_list));
4172
4173         ret = rbd_post_acquire_action(rbd_dev);
4174         if (ret) {
4175                 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4176                 /*
4177                  * Can't stay in RBD_LOCK_STATE_LOCKED because
4178                  * rbd_lock_add_request() would let the request through,
4179                  * assuming that e.g. object map is locked and loaded.
4180                  */
4181                 rbd_unlock(rbd_dev);
4182         }
4183
4184 out:
4185         wake_lock_waiters(rbd_dev, ret);
4186         up_write(&rbd_dev->lock_rwsem);
4187         return ret;
4188 }
4189
4190 static void rbd_acquire_lock(struct work_struct *work)
4191 {
4192         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4193                                             struct rbd_device, lock_dwork);
4194         int ret;
4195
4196         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4197 again:
4198         ret = rbd_try_acquire_lock(rbd_dev);
4199         if (ret <= 0) {
4200                 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4201                 return;
4202         }
4203
4204         ret = rbd_request_lock(rbd_dev);
4205         if (ret == -ETIMEDOUT) {
4206                 goto again; /* treat this as a dead client */
4207         } else if (ret == -EROFS) {
4208                 rbd_warn(rbd_dev, "peer will not release lock");
4209                 down_write(&rbd_dev->lock_rwsem);
4210                 wake_lock_waiters(rbd_dev, ret);
4211                 up_write(&rbd_dev->lock_rwsem);
4212         } else if (ret < 0) {
4213                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4214                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4215                                  RBD_RETRY_DELAY);
4216         } else {
4217                 /*
4218                  * lock owner acked, but resend if we don't see them
4219                  * release the lock
4220                  */
4221                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
4222                      rbd_dev);
4223                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4224                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4225         }
4226 }
4227
4228 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4229 {
4230         bool need_wait;
4231
4232         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4233         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4234
4235         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4236                 return false;
4237
4238         /*
4239          * Ensure that all in-flight IO is flushed.
4240          */
4241         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4242         rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4243         need_wait = !list_empty(&rbd_dev->running_list);
4244         downgrade_write(&rbd_dev->lock_rwsem);
4245         if (need_wait)
4246                 wait_for_completion(&rbd_dev->releasing_wait);
4247         up_read(&rbd_dev->lock_rwsem);
4248
4249         down_write(&rbd_dev->lock_rwsem);
4250         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4251                 return false;
4252
4253         rbd_assert(list_empty(&rbd_dev->running_list));
4254         return true;
4255 }
4256
4257 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4258 {
4259         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4260                 rbd_object_map_close(rbd_dev);
4261 }
4262
4263 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4264 {
4265         rbd_assert(list_empty(&rbd_dev->running_list));
4266
4267         rbd_pre_release_action(rbd_dev);
4268         rbd_unlock(rbd_dev);
4269 }
4270
4271 /*
4272  * lock_rwsem must be held for write
4273  */
4274 static void rbd_release_lock(struct rbd_device *rbd_dev)
4275 {
4276         if (!rbd_quiesce_lock(rbd_dev))
4277                 return;
4278
4279         __rbd_release_lock(rbd_dev);
4280
4281         /*
4282          * Give others a chance to grab the lock - we would re-acquire
4283          * almost immediately if we got new IO while draining the running
4284          * list otherwise.  We need to ack our own notifications, so this
4285          * lock_dwork will be requeued from rbd_handle_released_lock() by
4286          * way of maybe_kick_acquire().
4287          */
4288         cancel_delayed_work(&rbd_dev->lock_dwork);
4289 }
4290
4291 static void rbd_release_lock_work(struct work_struct *work)
4292 {
4293         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4294                                                   unlock_work);
4295
4296         down_write(&rbd_dev->lock_rwsem);
4297         rbd_release_lock(rbd_dev);
4298         up_write(&rbd_dev->lock_rwsem);
4299 }
4300
4301 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4302 {
4303         bool have_requests;
4304
4305         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4306         if (__rbd_is_lock_owner(rbd_dev))
4307                 return;
4308
4309         spin_lock(&rbd_dev->lock_lists_lock);
4310         have_requests = !list_empty(&rbd_dev->acquiring_list);
4311         spin_unlock(&rbd_dev->lock_lists_lock);
4312         if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4313                 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4314                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4315         }
4316 }
4317
4318 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4319                                      void **p)
4320 {
4321         struct rbd_client_id cid = { 0 };
4322
4323         if (struct_v >= 2) {
4324                 cid.gid = ceph_decode_64(p);
4325                 cid.handle = ceph_decode_64(p);
4326         }
4327
4328         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4329              cid.handle);
4330         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4331                 down_write(&rbd_dev->lock_rwsem);
4332                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4333                         /*
4334                          * we already know that the remote client is
4335                          * the owner
4336                          */
4337                         up_write(&rbd_dev->lock_rwsem);
4338                         return;
4339                 }
4340
4341                 rbd_set_owner_cid(rbd_dev, &cid);
4342                 downgrade_write(&rbd_dev->lock_rwsem);
4343         } else {
4344                 down_read(&rbd_dev->lock_rwsem);
4345         }
4346
4347         maybe_kick_acquire(rbd_dev);
4348         up_read(&rbd_dev->lock_rwsem);
4349 }
4350
4351 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4352                                      void **p)
4353 {
4354         struct rbd_client_id cid = { 0 };
4355
4356         if (struct_v >= 2) {
4357                 cid.gid = ceph_decode_64(p);
4358                 cid.handle = ceph_decode_64(p);
4359         }
4360
4361         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4362              cid.handle);
4363         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4364                 down_write(&rbd_dev->lock_rwsem);
4365                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4366                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4367                              __func__, rbd_dev, cid.gid, cid.handle,
4368                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4369                         up_write(&rbd_dev->lock_rwsem);
4370                         return;
4371                 }
4372
4373                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4374                 downgrade_write(&rbd_dev->lock_rwsem);
4375         } else {
4376                 down_read(&rbd_dev->lock_rwsem);
4377         }
4378
4379         maybe_kick_acquire(rbd_dev);
4380         up_read(&rbd_dev->lock_rwsem);
4381 }
4382
4383 /*
4384  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4385  * ResponseMessage is needed.
4386  */
4387 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4388                                    void **p)
4389 {
4390         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4391         struct rbd_client_id cid = { 0 };
4392         int result = 1;
4393
4394         if (struct_v >= 2) {
4395                 cid.gid = ceph_decode_64(p);
4396                 cid.handle = ceph_decode_64(p);
4397         }
4398
4399         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4400              cid.handle);
4401         if (rbd_cid_equal(&cid, &my_cid))
4402                 return result;
4403
4404         down_read(&rbd_dev->lock_rwsem);
4405         if (__rbd_is_lock_owner(rbd_dev)) {
4406                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4407                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4408                         goto out_unlock;
4409
4410                 /*
4411                  * encode ResponseMessage(0) so the peer can detect
4412                  * a missing owner
4413                  */
4414                 result = 0;
4415
4416                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4417                         if (!rbd_dev->opts->exclusive) {
4418                                 dout("%s rbd_dev %p queueing unlock_work\n",
4419                                      __func__, rbd_dev);
4420                                 queue_work(rbd_dev->task_wq,
4421                                            &rbd_dev->unlock_work);
4422                         } else {
4423                                 /* refuse to release the lock */
4424                                 result = -EROFS;
4425                         }
4426                 }
4427         }
4428
4429 out_unlock:
4430         up_read(&rbd_dev->lock_rwsem);
4431         return result;
4432 }
4433
4434 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4435                                      u64 notify_id, u64 cookie, s32 *result)
4436 {
4437         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4438         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4439         int buf_size = sizeof(buf);
4440         int ret;
4441
4442         if (result) {
4443                 void *p = buf;
4444
4445                 /* encode ResponseMessage */
4446                 ceph_start_encoding(&p, 1, 1,
4447                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4448                 ceph_encode_32(&p, *result);
4449         } else {
4450                 buf_size = 0;
4451         }
4452
4453         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4454                                    &rbd_dev->header_oloc, notify_id, cookie,
4455                                    buf, buf_size);
4456         if (ret)
4457                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4458 }
4459
4460 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4461                                    u64 cookie)
4462 {
4463         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4464         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4465 }
4466
4467 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4468                                           u64 notify_id, u64 cookie, s32 result)
4469 {
4470         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4471         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4472 }
4473
4474 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4475                          u64 notifier_id, void *data, size_t data_len)
4476 {
4477         struct rbd_device *rbd_dev = arg;
4478         void *p = data;
4479         void *const end = p + data_len;
4480         u8 struct_v = 0;
4481         u32 len;
4482         u32 notify_op;
4483         int ret;
4484
4485         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4486              __func__, rbd_dev, cookie, notify_id, data_len);
4487         if (data_len) {
4488                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4489                                           &struct_v, &len);
4490                 if (ret) {
4491                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4492                                  ret);
4493                         return;
4494                 }
4495
4496                 notify_op = ceph_decode_32(&p);
4497         } else {
4498                 /* legacy notification for header updates */
4499                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4500                 len = 0;
4501         }
4502
4503         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4504         switch (notify_op) {
4505         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4506                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4507                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4508                 break;
4509         case RBD_NOTIFY_OP_RELEASED_LOCK:
4510                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4511                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4512                 break;
4513         case RBD_NOTIFY_OP_REQUEST_LOCK:
4514                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4515                 if (ret <= 0)
4516                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4517                                                       cookie, ret);
4518                 else
4519                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4520                 break;
4521         case RBD_NOTIFY_OP_HEADER_UPDATE:
4522                 ret = rbd_dev_refresh(rbd_dev);
4523                 if (ret)
4524                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
4525
4526                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4527                 break;
4528         default:
4529                 if (rbd_is_lock_owner(rbd_dev))
4530                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4531                                                       cookie, -EOPNOTSUPP);
4532                 else
4533                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4534                 break;
4535         }
4536 }
4537
4538 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4539
4540 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4541 {
4542         struct rbd_device *rbd_dev = arg;
4543
4544         rbd_warn(rbd_dev, "encountered watch error: %d", err);
4545
4546         down_write(&rbd_dev->lock_rwsem);
4547         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4548         up_write(&rbd_dev->lock_rwsem);
4549
4550         mutex_lock(&rbd_dev->watch_mutex);
4551         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4552                 __rbd_unregister_watch(rbd_dev);
4553                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4554
4555                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4556         }
4557         mutex_unlock(&rbd_dev->watch_mutex);
4558 }
4559
4560 /*
4561  * watch_mutex must be locked
4562  */
4563 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4564 {
4565         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4566         struct ceph_osd_linger_request *handle;
4567
4568         rbd_assert(!rbd_dev->watch_handle);
4569         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4570
4571         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4572                                  &rbd_dev->header_oloc, rbd_watch_cb,
4573                                  rbd_watch_errcb, rbd_dev);
4574         if (IS_ERR(handle))
4575                 return PTR_ERR(handle);
4576
4577         rbd_dev->watch_handle = handle;
4578         return 0;
4579 }
4580
4581 /*
4582  * watch_mutex must be locked
4583  */
4584 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4585 {
4586         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4587         int ret;
4588
4589         rbd_assert(rbd_dev->watch_handle);
4590         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4591
4592         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4593         if (ret)
4594                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4595
4596         rbd_dev->watch_handle = NULL;
4597 }
4598
4599 static int rbd_register_watch(struct rbd_device *rbd_dev)
4600 {
4601         int ret;
4602
4603         mutex_lock(&rbd_dev->watch_mutex);
4604         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4605         ret = __rbd_register_watch(rbd_dev);
4606         if (ret)
4607                 goto out;
4608
4609         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4610         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4611
4612 out:
4613         mutex_unlock(&rbd_dev->watch_mutex);
4614         return ret;
4615 }
4616
4617 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4618 {
4619         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4620
4621         cancel_work_sync(&rbd_dev->acquired_lock_work);
4622         cancel_work_sync(&rbd_dev->released_lock_work);
4623         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4624         cancel_work_sync(&rbd_dev->unlock_work);
4625 }
4626
4627 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4628 {
4629         cancel_tasks_sync(rbd_dev);
4630
4631         mutex_lock(&rbd_dev->watch_mutex);
4632         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4633                 __rbd_unregister_watch(rbd_dev);
4634         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4635         mutex_unlock(&rbd_dev->watch_mutex);
4636
4637         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4638         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4639 }
4640
4641 /*
4642  * lock_rwsem must be held for write
4643  */
4644 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4645 {
4646         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4647         char cookie[32];
4648         int ret;
4649
4650         if (!rbd_quiesce_lock(rbd_dev))
4651                 return;
4652
4653         format_lock_cookie(rbd_dev, cookie);
4654         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4655                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4656                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4657                                   RBD_LOCK_TAG, cookie);
4658         if (ret) {
4659                 if (ret != -EOPNOTSUPP)
4660                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4661                                  ret);
4662
4663                 /*
4664                  * Lock cookie cannot be updated on older OSDs, so do
4665                  * a manual release and queue an acquire.
4666                  */
4667                 __rbd_release_lock(rbd_dev);
4668                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4669         } else {
4670                 __rbd_lock(rbd_dev, cookie);
4671                 wake_lock_waiters(rbd_dev, 0);
4672         }
4673 }
4674
4675 static void rbd_reregister_watch(struct work_struct *work)
4676 {
4677         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4678                                             struct rbd_device, watch_dwork);
4679         int ret;
4680
4681         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4682
4683         mutex_lock(&rbd_dev->watch_mutex);
4684         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4685                 mutex_unlock(&rbd_dev->watch_mutex);
4686                 return;
4687         }
4688
4689         ret = __rbd_register_watch(rbd_dev);
4690         if (ret) {
4691                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4692                 if (ret != -EBLACKLISTED && ret != -ENOENT) {
4693                         queue_delayed_work(rbd_dev->task_wq,
4694                                            &rbd_dev->watch_dwork,
4695                                            RBD_RETRY_DELAY);
4696                         mutex_unlock(&rbd_dev->watch_mutex);
4697                         return;
4698                 }
4699
4700                 mutex_unlock(&rbd_dev->watch_mutex);
4701                 down_write(&rbd_dev->lock_rwsem);
4702                 wake_lock_waiters(rbd_dev, ret);
4703                 up_write(&rbd_dev->lock_rwsem);
4704                 return;
4705         }
4706
4707         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4708         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4709         mutex_unlock(&rbd_dev->watch_mutex);
4710
4711         down_write(&rbd_dev->lock_rwsem);
4712         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4713                 rbd_reacquire_lock(rbd_dev);
4714         up_write(&rbd_dev->lock_rwsem);
4715
4716         ret = rbd_dev_refresh(rbd_dev);
4717         if (ret)
4718                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4719 }
4720
4721 /*
4722  * Synchronous osd object method call.  Returns the number of bytes
4723  * returned in the outbound buffer, or a negative error code.
4724  */
4725 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4726                              struct ceph_object_id *oid,
4727                              struct ceph_object_locator *oloc,
4728                              const char *method_name,
4729                              const void *outbound,
4730                              size_t outbound_size,
4731                              void *inbound,
4732                              size_t inbound_size)
4733 {
4734         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4735         struct page *req_page = NULL;
4736         struct page *reply_page;
4737         int ret;
4738
4739         /*
4740          * Method calls are ultimately read operations.  The result
4741          * should placed into the inbound buffer provided.  They
4742          * also supply outbound data--parameters for the object
4743          * method.  Currently if this is present it will be a
4744          * snapshot id.
4745          */
4746         if (outbound) {
4747                 if (outbound_size > PAGE_SIZE)
4748                         return -E2BIG;
4749
4750                 req_page = alloc_page(GFP_KERNEL);
4751                 if (!req_page)
4752                         return -ENOMEM;
4753
4754                 memcpy(page_address(req_page), outbound, outbound_size);
4755         }
4756
4757         reply_page = alloc_page(GFP_KERNEL);
4758         if (!reply_page) {
4759                 if (req_page)
4760                         __free_page(req_page);
4761                 return -ENOMEM;
4762         }
4763
4764         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4765                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
4766                              &reply_page, &inbound_size);
4767         if (!ret) {
4768                 memcpy(inbound, page_address(reply_page), inbound_size);
4769                 ret = inbound_size;
4770         }
4771
4772         if (req_page)
4773                 __free_page(req_page);
4774         __free_page(reply_page);
4775         return ret;
4776 }
4777
4778 static void rbd_queue_workfn(struct work_struct *work)
4779 {
4780         struct request *rq = blk_mq_rq_from_pdu(work);
4781         struct rbd_device *rbd_dev = rq->q->queuedata;
4782         struct rbd_img_request *img_request;
4783         struct ceph_snap_context *snapc = NULL;
4784         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4785         u64 length = blk_rq_bytes(rq);
4786         enum obj_operation_type op_type;
4787         u64 mapping_size;
4788         int result;
4789
4790         switch (req_op(rq)) {
4791         case REQ_OP_DISCARD:
4792                 op_type = OBJ_OP_DISCARD;
4793                 break;
4794         case REQ_OP_WRITE_ZEROES:
4795                 op_type = OBJ_OP_ZEROOUT;
4796                 break;
4797         case REQ_OP_WRITE:
4798                 op_type = OBJ_OP_WRITE;
4799                 break;
4800         case REQ_OP_READ:
4801                 op_type = OBJ_OP_READ;
4802                 break;
4803         default:
4804                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4805                 result = -EIO;
4806                 goto err;
4807         }
4808
4809         /* Ignore/skip any zero-length requests */
4810
4811         if (!length) {
4812                 dout("%s: zero-length request\n", __func__);
4813                 result = 0;
4814                 goto err_rq;
4815         }
4816
4817         if (op_type != OBJ_OP_READ && rbd_dev->spec->snap_id != CEPH_NOSNAP) {
4818                 rbd_warn(rbd_dev, "%s on read-only snapshot",
4819                          obj_op_name(op_type));
4820                 result = -EIO;
4821                 goto err;
4822         }
4823
4824         /*
4825          * Quit early if the mapped snapshot no longer exists.  It's
4826          * still possible the snapshot will have disappeared by the
4827          * time our request arrives at the osd, but there's no sense in
4828          * sending it if we already know.
4829          */
4830         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4831                 dout("request for non-existent snapshot");
4832                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4833                 result = -ENXIO;
4834                 goto err_rq;
4835         }
4836
4837         if (offset && length > U64_MAX - offset + 1) {
4838                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4839                          length);
4840                 result = -EINVAL;
4841                 goto err_rq;    /* Shouldn't happen */
4842         }
4843
4844         blk_mq_start_request(rq);
4845
4846         down_read(&rbd_dev->header_rwsem);
4847         mapping_size = rbd_dev->mapping.size;
4848         if (op_type != OBJ_OP_READ) {
4849                 snapc = rbd_dev->header.snapc;
4850                 ceph_get_snap_context(snapc);
4851         }
4852         up_read(&rbd_dev->header_rwsem);
4853
4854         if (offset + length > mapping_size) {
4855                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4856                          length, mapping_size);
4857                 result = -EIO;
4858                 goto err_rq;
4859         }
4860
4861         img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4862         if (!img_request) {
4863                 result = -ENOMEM;
4864                 goto err_rq;
4865         }
4866         img_request->rq = rq;
4867         snapc = NULL; /* img_request consumes a ref */
4868
4869         if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4870                 result = rbd_img_fill_nodata(img_request, offset, length);
4871         else
4872                 result = rbd_img_fill_from_bio(img_request, offset, length,
4873                                                rq->bio);
4874         if (result)
4875                 goto err_img_request;
4876
4877         rbd_img_handle_request(img_request, 0);
4878         return;
4879
4880 err_img_request:
4881         rbd_img_request_put(img_request);
4882 err_rq:
4883         if (result)
4884                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4885                          obj_op_name(op_type), length, offset, result);
4886         ceph_put_snap_context(snapc);
4887 err:
4888         blk_mq_end_request(rq, errno_to_blk_status(result));
4889 }
4890
4891 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4892                 const struct blk_mq_queue_data *bd)
4893 {
4894         struct request *rq = bd->rq;
4895         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4896
4897         queue_work(rbd_wq, work);
4898         return BLK_STS_OK;
4899 }
4900
4901 static void rbd_free_disk(struct rbd_device *rbd_dev)
4902 {
4903         blk_cleanup_queue(rbd_dev->disk->queue);
4904         blk_mq_free_tag_set(&rbd_dev->tag_set);
4905         put_disk(rbd_dev->disk);
4906         rbd_dev->disk = NULL;
4907 }
4908
4909 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4910                              struct ceph_object_id *oid,
4911                              struct ceph_object_locator *oloc,
4912                              void *buf, int buf_len)
4913
4914 {
4915         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4916         struct ceph_osd_request *req;
4917         struct page **pages;
4918         int num_pages = calc_pages_for(0, buf_len);
4919         int ret;
4920
4921         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4922         if (!req)
4923                 return -ENOMEM;
4924
4925         ceph_oid_copy(&req->r_base_oid, oid);
4926         ceph_oloc_copy(&req->r_base_oloc, oloc);
4927         req->r_flags = CEPH_OSD_FLAG_READ;
4928
4929         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4930         if (IS_ERR(pages)) {
4931                 ret = PTR_ERR(pages);
4932                 goto out_req;
4933         }
4934
4935         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4936         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4937                                          true);
4938
4939         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4940         if (ret)
4941                 goto out_req;
4942
4943         ceph_osdc_start_request(osdc, req, false);
4944         ret = ceph_osdc_wait_request(osdc, req);
4945         if (ret >= 0)
4946                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4947
4948 out_req:
4949         ceph_osdc_put_request(req);
4950         return ret;
4951 }
4952
4953 /*
4954  * Read the complete header for the given rbd device.  On successful
4955  * return, the rbd_dev->header field will contain up-to-date
4956  * information about the image.
4957  */
4958 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4959 {
4960         struct rbd_image_header_ondisk *ondisk = NULL;
4961         u32 snap_count = 0;
4962         u64 names_size = 0;
4963         u32 want_count;
4964         int ret;
4965
4966         /*
4967          * The complete header will include an array of its 64-bit
4968          * snapshot ids, followed by the names of those snapshots as
4969          * a contiguous block of NUL-terminated strings.  Note that
4970          * the number of snapshots could change by the time we read
4971          * it in, in which case we re-read it.
4972          */
4973         do {
4974                 size_t size;
4975
4976                 kfree(ondisk);
4977
4978                 size = sizeof (*ondisk);
4979                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4980                 size += names_size;
4981                 ondisk = kmalloc(size, GFP_KERNEL);
4982                 if (!ondisk)
4983                         return -ENOMEM;
4984
4985                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4986                                         &rbd_dev->header_oloc, ondisk, size);
4987                 if (ret < 0)
4988                         goto out;
4989                 if ((size_t)ret < size) {
4990                         ret = -ENXIO;
4991                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4992                                 size, ret);
4993                         goto out;
4994                 }
4995                 if (!rbd_dev_ondisk_valid(ondisk)) {
4996                         ret = -ENXIO;
4997                         rbd_warn(rbd_dev, "invalid header");
4998                         goto out;
4999                 }
5000
5001                 names_size = le64_to_cpu(ondisk->snap_names_len);
5002                 want_count = snap_count;
5003                 snap_count = le32_to_cpu(ondisk->snap_count);
5004         } while (snap_count != want_count);
5005
5006         ret = rbd_header_from_disk(rbd_dev, ondisk);
5007 out:
5008         kfree(ondisk);
5009
5010         return ret;
5011 }
5012
5013 /*
5014  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
5015  * has disappeared from the (just updated) snapshot context.
5016  */
5017 static void rbd_exists_validate(struct rbd_device *rbd_dev)
5018 {
5019         u64 snap_id;
5020
5021         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
5022                 return;
5023
5024         snap_id = rbd_dev->spec->snap_id;
5025         if (snap_id == CEPH_NOSNAP)
5026                 return;
5027
5028         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
5029                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5030 }
5031
5032 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
5033 {
5034         sector_t size;
5035
5036         /*
5037          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
5038          * try to update its size.  If REMOVING is set, updating size
5039          * is just useless work since the device can't be opened.
5040          */
5041         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
5042             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
5043                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
5044                 dout("setting size to %llu sectors", (unsigned long long)size);
5045                 set_capacity(rbd_dev->disk, size);
5046                 revalidate_disk(rbd_dev->disk);
5047         }
5048 }
5049
5050 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
5051 {
5052         u64 mapping_size;
5053         int ret;
5054
5055         down_write(&rbd_dev->header_rwsem);
5056         mapping_size = rbd_dev->mapping.size;
5057
5058         ret = rbd_dev_header_info(rbd_dev);
5059         if (ret)
5060                 goto out;
5061
5062         /*
5063          * If there is a parent, see if it has disappeared due to the
5064          * mapped image getting flattened.
5065          */
5066         if (rbd_dev->parent) {
5067                 ret = rbd_dev_v2_parent_info(rbd_dev);
5068                 if (ret)
5069                         goto out;
5070         }
5071
5072         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
5073                 rbd_dev->mapping.size = rbd_dev->header.image_size;
5074         } else {
5075                 /* validate mapped snapshot's EXISTS flag */
5076                 rbd_exists_validate(rbd_dev);
5077         }
5078
5079 out:
5080         up_write(&rbd_dev->header_rwsem);
5081         if (!ret && mapping_size != rbd_dev->mapping.size)
5082                 rbd_dev_update_size(rbd_dev);
5083
5084         return ret;
5085 }
5086
5087 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
5088                 unsigned int hctx_idx, unsigned int numa_node)
5089 {
5090         struct work_struct *work = blk_mq_rq_to_pdu(rq);
5091
5092         INIT_WORK(work, rbd_queue_workfn);
5093         return 0;
5094 }
5095
5096 static const struct blk_mq_ops rbd_mq_ops = {
5097         .queue_rq       = rbd_queue_rq,
5098         .init_request   = rbd_init_request,
5099 };
5100
5101 static int rbd_init_disk(struct rbd_device *rbd_dev)
5102 {
5103         struct gendisk *disk;
5104         struct request_queue *q;
5105         unsigned int objset_bytes =
5106             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5107         int err;
5108
5109         /* create gendisk info */
5110         disk = alloc_disk(single_major ?
5111                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5112                           RBD_MINORS_PER_MAJOR);
5113         if (!disk)
5114                 return -ENOMEM;
5115
5116         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5117                  rbd_dev->dev_id);
5118         disk->major = rbd_dev->major;
5119         disk->first_minor = rbd_dev->minor;
5120         if (single_major)
5121                 disk->flags |= GENHD_FL_EXT_DEVT;
5122         disk->fops = &rbd_bd_ops;
5123         disk->private_data = rbd_dev;
5124
5125         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5126         rbd_dev->tag_set.ops = &rbd_mq_ops;
5127         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5128         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5129         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5130         rbd_dev->tag_set.nr_hw_queues = 1;
5131         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5132
5133         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5134         if (err)
5135                 goto out_disk;
5136
5137         q = blk_mq_init_queue(&rbd_dev->tag_set);
5138         if (IS_ERR(q)) {
5139                 err = PTR_ERR(q);
5140                 goto out_tag_set;
5141         }
5142
5143         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5144         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5145
5146         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5147         q->limits.max_sectors = queue_max_hw_sectors(q);
5148         blk_queue_max_segments(q, USHRT_MAX);
5149         blk_queue_max_segment_size(q, UINT_MAX);
5150         blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5151         blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5152
5153         if (rbd_dev->opts->trim) {
5154                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5155                 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5156                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5157                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5158         }
5159
5160         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5161                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5162
5163         /*
5164          * disk_release() expects a queue ref from add_disk() and will
5165          * put it.  Hold an extra ref until add_disk() is called.
5166          */
5167         WARN_ON(!blk_get_queue(q));
5168         disk->queue = q;
5169         q->queuedata = rbd_dev;
5170
5171         rbd_dev->disk = disk;
5172
5173         return 0;
5174 out_tag_set:
5175         blk_mq_free_tag_set(&rbd_dev->tag_set);
5176 out_disk:
5177         put_disk(disk);
5178         return err;
5179 }
5180
5181 /*
5182   sysfs
5183 */
5184
5185 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5186 {
5187         return container_of(dev, struct rbd_device, dev);
5188 }
5189
5190 static ssize_t rbd_size_show(struct device *dev,
5191                              struct device_attribute *attr, char *buf)
5192 {
5193         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5194
5195         return sprintf(buf, "%llu\n",
5196                 (unsigned long long)rbd_dev->mapping.size);
5197 }
5198
5199 /*
5200  * Note this shows the features for whatever's mapped, which is not
5201  * necessarily the base image.
5202  */
5203 static ssize_t rbd_features_show(struct device *dev,
5204                              struct device_attribute *attr, char *buf)
5205 {
5206         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5207
5208         return sprintf(buf, "0x%016llx\n",
5209                         (unsigned long long)rbd_dev->mapping.features);
5210 }
5211
5212 static ssize_t rbd_major_show(struct device *dev,
5213                               struct device_attribute *attr, char *buf)
5214 {
5215         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5216
5217         if (rbd_dev->major)
5218                 return sprintf(buf, "%d\n", rbd_dev->major);
5219
5220         return sprintf(buf, "(none)\n");
5221 }
5222
5223 static ssize_t rbd_minor_show(struct device *dev,
5224                               struct device_attribute *attr, char *buf)
5225 {
5226         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5227
5228         return sprintf(buf, "%d\n", rbd_dev->minor);
5229 }
5230
5231 static ssize_t rbd_client_addr_show(struct device *dev,
5232                                     struct device_attribute *attr, char *buf)
5233 {
5234         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5235         struct ceph_entity_addr *client_addr =
5236             ceph_client_addr(rbd_dev->rbd_client->client);
5237
5238         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5239                        le32_to_cpu(client_addr->nonce));
5240 }
5241
5242 static ssize_t rbd_client_id_show(struct device *dev,
5243                                   struct device_attribute *attr, char *buf)
5244 {
5245         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5246
5247         return sprintf(buf, "client%lld\n",
5248                        ceph_client_gid(rbd_dev->rbd_client->client));
5249 }
5250
5251 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5252                                      struct device_attribute *attr, char *buf)
5253 {
5254         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5255
5256         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5257 }
5258
5259 static ssize_t rbd_config_info_show(struct device *dev,
5260                                     struct device_attribute *attr, char *buf)
5261 {
5262         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5263
5264         return sprintf(buf, "%s\n", rbd_dev->config_info);
5265 }
5266
5267 static ssize_t rbd_pool_show(struct device *dev,
5268                              struct device_attribute *attr, char *buf)
5269 {
5270         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5271
5272         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5273 }
5274
5275 static ssize_t rbd_pool_id_show(struct device *dev,
5276                              struct device_attribute *attr, char *buf)
5277 {
5278         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5279
5280         return sprintf(buf, "%llu\n",
5281                         (unsigned long long) rbd_dev->spec->pool_id);
5282 }
5283
5284 static ssize_t rbd_pool_ns_show(struct device *dev,
5285                                 struct device_attribute *attr, char *buf)
5286 {
5287         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5288
5289         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5290 }
5291
5292 static ssize_t rbd_name_show(struct device *dev,
5293                              struct device_attribute *attr, char *buf)
5294 {
5295         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5296
5297         if (rbd_dev->spec->image_name)
5298                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5299
5300         return sprintf(buf, "(unknown)\n");
5301 }
5302
5303 static ssize_t rbd_image_id_show(struct device *dev,
5304                              struct device_attribute *attr, char *buf)
5305 {
5306         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5307
5308         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5309 }
5310
5311 /*
5312  * Shows the name of the currently-mapped snapshot (or
5313  * RBD_SNAP_HEAD_NAME for the base image).
5314  */
5315 static ssize_t rbd_snap_show(struct device *dev,
5316                              struct device_attribute *attr,
5317                              char *buf)
5318 {
5319         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5320
5321         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5322 }
5323
5324 static ssize_t rbd_snap_id_show(struct device *dev,
5325                                 struct device_attribute *attr, char *buf)
5326 {
5327         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5328
5329         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5330 }
5331
5332 /*
5333  * For a v2 image, shows the chain of parent images, separated by empty
5334  * lines.  For v1 images or if there is no parent, shows "(no parent
5335  * image)".
5336  */
5337 static ssize_t rbd_parent_show(struct device *dev,
5338                                struct device_attribute *attr,
5339                                char *buf)
5340 {
5341         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5342         ssize_t count = 0;
5343
5344         if (!rbd_dev->parent)
5345                 return sprintf(buf, "(no parent image)\n");
5346
5347         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5348                 struct rbd_spec *spec = rbd_dev->parent_spec;
5349
5350                 count += sprintf(&buf[count], "%s"
5351                             "pool_id %llu\npool_name %s\n"
5352                             "pool_ns %s\n"
5353                             "image_id %s\nimage_name %s\n"
5354                             "snap_id %llu\nsnap_name %s\n"
5355                             "overlap %llu\n",
5356                             !count ? "" : "\n", /* first? */
5357                             spec->pool_id, spec->pool_name,
5358                             spec->pool_ns ?: "",
5359                             spec->image_id, spec->image_name ?: "(unknown)",
5360                             spec->snap_id, spec->snap_name,
5361                             rbd_dev->parent_overlap);
5362         }
5363
5364         return count;
5365 }
5366
5367 static ssize_t rbd_image_refresh(struct device *dev,
5368                                  struct device_attribute *attr,
5369                                  const char *buf,
5370                                  size_t size)
5371 {
5372         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5373         int ret;
5374
5375         ret = rbd_dev_refresh(rbd_dev);
5376         if (ret)
5377                 return ret;
5378
5379         return size;
5380 }
5381
5382 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5383 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5384 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5385 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5386 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5387 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5388 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5389 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5390 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5391 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5392 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5393 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5394 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5395 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5396 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5397 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5398 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5399
5400 static struct attribute *rbd_attrs[] = {
5401         &dev_attr_size.attr,
5402         &dev_attr_features.attr,
5403         &dev_attr_major.attr,
5404         &dev_attr_minor.attr,
5405         &dev_attr_client_addr.attr,
5406         &dev_attr_client_id.attr,
5407         &dev_attr_cluster_fsid.attr,
5408         &dev_attr_config_info.attr,
5409         &dev_attr_pool.attr,
5410         &dev_attr_pool_id.attr,
5411         &dev_attr_pool_ns.attr,
5412         &dev_attr_name.attr,
5413         &dev_attr_image_id.attr,
5414         &dev_attr_current_snap.attr,
5415         &dev_attr_snap_id.attr,
5416         &dev_attr_parent.attr,
5417         &dev_attr_refresh.attr,
5418         NULL
5419 };
5420
5421 static struct attribute_group rbd_attr_group = {
5422         .attrs = rbd_attrs,
5423 };
5424
5425 static const struct attribute_group *rbd_attr_groups[] = {
5426         &rbd_attr_group,
5427         NULL
5428 };
5429
5430 static void rbd_dev_release(struct device *dev);
5431
5432 static const struct device_type rbd_device_type = {
5433         .name           = "rbd",
5434         .groups         = rbd_attr_groups,
5435         .release        = rbd_dev_release,
5436 };
5437
5438 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5439 {
5440         kref_get(&spec->kref);
5441
5442         return spec;
5443 }
5444
5445 static void rbd_spec_free(struct kref *kref);
5446 static void rbd_spec_put(struct rbd_spec *spec)
5447 {
5448         if (spec)
5449                 kref_put(&spec->kref, rbd_spec_free);
5450 }
5451
5452 static struct rbd_spec *rbd_spec_alloc(void)
5453 {
5454         struct rbd_spec *spec;
5455
5456         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5457         if (!spec)
5458                 return NULL;
5459
5460         spec->pool_id = CEPH_NOPOOL;
5461         spec->snap_id = CEPH_NOSNAP;
5462         kref_init(&spec->kref);
5463
5464         return spec;
5465 }
5466
5467 static void rbd_spec_free(struct kref *kref)
5468 {
5469         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5470
5471         kfree(spec->pool_name);
5472         kfree(spec->pool_ns);
5473         kfree(spec->image_id);
5474         kfree(spec->image_name);
5475         kfree(spec->snap_name);
5476         kfree(spec);
5477 }
5478
5479 static void rbd_dev_free(struct rbd_device *rbd_dev)
5480 {
5481         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5482         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5483
5484         ceph_oid_destroy(&rbd_dev->header_oid);
5485         ceph_oloc_destroy(&rbd_dev->header_oloc);
5486         kfree(rbd_dev->config_info);
5487
5488         rbd_put_client(rbd_dev->rbd_client);
5489         rbd_spec_put(rbd_dev->spec);
5490         kfree(rbd_dev->opts);
5491         kfree(rbd_dev);
5492 }
5493
5494 static void rbd_dev_release(struct device *dev)
5495 {
5496         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5497         bool need_put = !!rbd_dev->opts;
5498
5499         if (need_put) {
5500                 destroy_workqueue(rbd_dev->task_wq);
5501                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5502         }
5503
5504         rbd_dev_free(rbd_dev);
5505
5506         /*
5507          * This is racy, but way better than putting module outside of
5508          * the release callback.  The race window is pretty small, so
5509          * doing something similar to dm (dm-builtin.c) is overkill.
5510          */
5511         if (need_put)
5512                 module_put(THIS_MODULE);
5513 }
5514
5515 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5516                                            struct rbd_spec *spec)
5517 {
5518         struct rbd_device *rbd_dev;
5519
5520         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5521         if (!rbd_dev)
5522                 return NULL;
5523
5524         spin_lock_init(&rbd_dev->lock);
5525         INIT_LIST_HEAD(&rbd_dev->node);
5526         init_rwsem(&rbd_dev->header_rwsem);
5527
5528         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5529         ceph_oid_init(&rbd_dev->header_oid);
5530         rbd_dev->header_oloc.pool = spec->pool_id;
5531         if (spec->pool_ns) {
5532                 WARN_ON(!*spec->pool_ns);
5533                 rbd_dev->header_oloc.pool_ns =
5534                     ceph_find_or_create_string(spec->pool_ns,
5535                                                strlen(spec->pool_ns));
5536         }
5537
5538         mutex_init(&rbd_dev->watch_mutex);
5539         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5540         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5541
5542         init_rwsem(&rbd_dev->lock_rwsem);
5543         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5544         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5545         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5546         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5547         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5548         spin_lock_init(&rbd_dev->lock_lists_lock);
5549         INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5550         INIT_LIST_HEAD(&rbd_dev->running_list);
5551         init_completion(&rbd_dev->acquire_wait);
5552         init_completion(&rbd_dev->releasing_wait);
5553
5554         spin_lock_init(&rbd_dev->object_map_lock);
5555
5556         rbd_dev->dev.bus = &rbd_bus_type;
5557         rbd_dev->dev.type = &rbd_device_type;
5558         rbd_dev->dev.parent = &rbd_root_dev;
5559         device_initialize(&rbd_dev->dev);
5560
5561         rbd_dev->rbd_client = rbdc;
5562         rbd_dev->spec = spec;
5563
5564         return rbd_dev;
5565 }
5566
5567 /*
5568  * Create a mapping rbd_dev.
5569  */
5570 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5571                                          struct rbd_spec *spec,
5572                                          struct rbd_options *opts)
5573 {
5574         struct rbd_device *rbd_dev;
5575
5576         rbd_dev = __rbd_dev_create(rbdc, spec);
5577         if (!rbd_dev)
5578                 return NULL;
5579
5580         rbd_dev->opts = opts;
5581
5582         /* get an id and fill in device name */
5583         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5584                                          minor_to_rbd_dev_id(1 << MINORBITS),
5585                                          GFP_KERNEL);
5586         if (rbd_dev->dev_id < 0)
5587                 goto fail_rbd_dev;
5588
5589         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5590         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5591                                                    rbd_dev->name);
5592         if (!rbd_dev->task_wq)
5593                 goto fail_dev_id;
5594
5595         /* we have a ref from do_rbd_add() */
5596         __module_get(THIS_MODULE);
5597
5598         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5599         return rbd_dev;
5600
5601 fail_dev_id:
5602         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5603 fail_rbd_dev:
5604         rbd_dev_free(rbd_dev);
5605         return NULL;
5606 }
5607
5608 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5609 {
5610         if (rbd_dev)
5611                 put_device(&rbd_dev->dev);
5612 }
5613
5614 /*
5615  * Get the size and object order for an image snapshot, or if
5616  * snap_id is CEPH_NOSNAP, gets this information for the base
5617  * image.
5618  */
5619 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5620                                 u8 *order, u64 *snap_size)
5621 {
5622         __le64 snapid = cpu_to_le64(snap_id);
5623         int ret;
5624         struct {
5625                 u8 order;
5626                 __le64 size;
5627         } __attribute__ ((packed)) size_buf = { 0 };
5628
5629         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5630                                   &rbd_dev->header_oloc, "get_size",
5631                                   &snapid, sizeof(snapid),
5632                                   &size_buf, sizeof(size_buf));
5633         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5634         if (ret < 0)
5635                 return ret;
5636         if (ret < sizeof (size_buf))
5637                 return -ERANGE;
5638
5639         if (order) {
5640                 *order = size_buf.order;
5641                 dout("  order %u", (unsigned int)*order);
5642         }
5643         *snap_size = le64_to_cpu(size_buf.size);
5644
5645         dout("  snap_id 0x%016llx snap_size = %llu\n",
5646                 (unsigned long long)snap_id,
5647                 (unsigned long long)*snap_size);
5648
5649         return 0;
5650 }
5651
5652 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5653 {
5654         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5655                                         &rbd_dev->header.obj_order,
5656                                         &rbd_dev->header.image_size);
5657 }
5658
5659 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5660 {
5661         void *reply_buf;
5662         int ret;
5663         void *p;
5664
5665         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5666         if (!reply_buf)
5667                 return -ENOMEM;
5668
5669         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5670                                   &rbd_dev->header_oloc, "get_object_prefix",
5671                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5672         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5673         if (ret < 0)
5674                 goto out;
5675
5676         p = reply_buf;
5677         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5678                                                 p + ret, NULL, GFP_NOIO);
5679         ret = 0;
5680
5681         if (IS_ERR(rbd_dev->header.object_prefix)) {
5682                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5683                 rbd_dev->header.object_prefix = NULL;
5684         } else {
5685                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5686         }
5687 out:
5688         kfree(reply_buf);
5689
5690         return ret;
5691 }
5692
5693 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5694                 u64 *snap_features)
5695 {
5696         __le64 snapid = cpu_to_le64(snap_id);
5697         struct {
5698                 __le64 features;
5699                 __le64 incompat;
5700         } __attribute__ ((packed)) features_buf = { 0 };
5701         u64 unsup;
5702         int ret;
5703
5704         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5705                                   &rbd_dev->header_oloc, "get_features",
5706                                   &snapid, sizeof(snapid),
5707                                   &features_buf, sizeof(features_buf));
5708         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5709         if (ret < 0)
5710                 return ret;
5711         if (ret < sizeof (features_buf))
5712                 return -ERANGE;
5713
5714         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5715         if (unsup) {
5716                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5717                          unsup);
5718                 return -ENXIO;
5719         }
5720
5721         *snap_features = le64_to_cpu(features_buf.features);
5722
5723         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5724                 (unsigned long long)snap_id,
5725                 (unsigned long long)*snap_features,
5726                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5727
5728         return 0;
5729 }
5730
5731 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5732 {
5733         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5734                                                 &rbd_dev->header.features);
5735 }
5736
5737 /*
5738  * These are generic image flags, but since they are used only for
5739  * object map, store them in rbd_dev->object_map_flags.
5740  *
5741  * For the same reason, this function is called only on object map
5742  * (re)load and not on header refresh.
5743  */
5744 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5745 {
5746         __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5747         __le64 flags;
5748         int ret;
5749
5750         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5751                                   &rbd_dev->header_oloc, "get_flags",
5752                                   &snapid, sizeof(snapid),
5753                                   &flags, sizeof(flags));
5754         if (ret < 0)
5755                 return ret;
5756         if (ret < sizeof(flags))
5757                 return -EBADMSG;
5758
5759         rbd_dev->object_map_flags = le64_to_cpu(flags);
5760         return 0;
5761 }
5762
5763 struct parent_image_info {
5764         u64             pool_id;
5765         const char      *pool_ns;
5766         const char      *image_id;
5767         u64             snap_id;
5768
5769         bool            has_overlap;
5770         u64             overlap;
5771 };
5772
5773 /*
5774  * The caller is responsible for @pii.
5775  */
5776 static int decode_parent_image_spec(void **p, void *end,
5777                                     struct parent_image_info *pii)
5778 {
5779         u8 struct_v;
5780         u32 struct_len;
5781         int ret;
5782
5783         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5784                                   &struct_v, &struct_len);
5785         if (ret)
5786                 return ret;
5787
5788         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5789         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5790         if (IS_ERR(pii->pool_ns)) {
5791                 ret = PTR_ERR(pii->pool_ns);
5792                 pii->pool_ns = NULL;
5793                 return ret;
5794         }
5795         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5796         if (IS_ERR(pii->image_id)) {
5797                 ret = PTR_ERR(pii->image_id);
5798                 pii->image_id = NULL;
5799                 return ret;
5800         }
5801         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5802         return 0;
5803
5804 e_inval:
5805         return -EINVAL;
5806 }
5807
5808 static int __get_parent_info(struct rbd_device *rbd_dev,
5809                              struct page *req_page,
5810                              struct page *reply_page,
5811                              struct parent_image_info *pii)
5812 {
5813         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5814         size_t reply_len = PAGE_SIZE;
5815         void *p, *end;
5816         int ret;
5817
5818         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5819                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5820                              req_page, sizeof(u64), &reply_page, &reply_len);
5821         if (ret)
5822                 return ret == -EOPNOTSUPP ? 1 : ret;
5823
5824         p = page_address(reply_page);
5825         end = p + reply_len;
5826         ret = decode_parent_image_spec(&p, end, pii);
5827         if (ret)
5828                 return ret;
5829
5830         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5831                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5832                              req_page, sizeof(u64), &reply_page, &reply_len);
5833         if (ret)
5834                 return ret;
5835
5836         p = page_address(reply_page);
5837         end = p + reply_len;
5838         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5839         if (pii->has_overlap)
5840                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5841
5842         return 0;
5843
5844 e_inval:
5845         return -EINVAL;
5846 }
5847
5848 /*
5849  * The caller is responsible for @pii.
5850  */
5851 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5852                                     struct page *req_page,
5853                                     struct page *reply_page,
5854                                     struct parent_image_info *pii)
5855 {
5856         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5857         size_t reply_len = PAGE_SIZE;
5858         void *p, *end;
5859         int ret;
5860
5861         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5862                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5863                              req_page, sizeof(u64), &reply_page, &reply_len);
5864         if (ret)
5865                 return ret;
5866
5867         p = page_address(reply_page);
5868         end = p + reply_len;
5869         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5870         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5871         if (IS_ERR(pii->image_id)) {
5872                 ret = PTR_ERR(pii->image_id);
5873                 pii->image_id = NULL;
5874                 return ret;
5875         }
5876         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5877         pii->has_overlap = true;
5878         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5879
5880         return 0;
5881
5882 e_inval:
5883         return -EINVAL;
5884 }
5885
5886 static int get_parent_info(struct rbd_device *rbd_dev,
5887                            struct parent_image_info *pii)
5888 {
5889         struct page *req_page, *reply_page;
5890         void *p;
5891         int ret;
5892
5893         req_page = alloc_page(GFP_KERNEL);
5894         if (!req_page)
5895                 return -ENOMEM;
5896
5897         reply_page = alloc_page(GFP_KERNEL);
5898         if (!reply_page) {
5899                 __free_page(req_page);
5900                 return -ENOMEM;
5901         }
5902
5903         p = page_address(req_page);
5904         ceph_encode_64(&p, rbd_dev->spec->snap_id);
5905         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5906         if (ret > 0)
5907                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5908                                                pii);
5909
5910         __free_page(req_page);
5911         __free_page(reply_page);
5912         return ret;
5913 }
5914
5915 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5916 {
5917         struct rbd_spec *parent_spec;
5918         struct parent_image_info pii = { 0 };
5919         int ret;
5920
5921         parent_spec = rbd_spec_alloc();
5922         if (!parent_spec)
5923                 return -ENOMEM;
5924
5925         ret = get_parent_info(rbd_dev, &pii);
5926         if (ret)
5927                 goto out_err;
5928
5929         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5930              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5931              pii.has_overlap, pii.overlap);
5932
5933         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5934                 /*
5935                  * Either the parent never existed, or we have
5936                  * record of it but the image got flattened so it no
5937                  * longer has a parent.  When the parent of a
5938                  * layered image disappears we immediately set the
5939                  * overlap to 0.  The effect of this is that all new
5940                  * requests will be treated as if the image had no
5941                  * parent.
5942                  *
5943                  * If !pii.has_overlap, the parent image spec is not
5944                  * applicable.  It's there to avoid duplication in each
5945                  * snapshot record.
5946                  */
5947                 if (rbd_dev->parent_overlap) {
5948                         rbd_dev->parent_overlap = 0;
5949                         rbd_dev_parent_put(rbd_dev);
5950                         pr_info("%s: clone image has been flattened\n",
5951                                 rbd_dev->disk->disk_name);
5952                 }
5953
5954                 goto out;       /* No parent?  No problem. */
5955         }
5956
5957         /* The ceph file layout needs to fit pool id in 32 bits */
5958
5959         ret = -EIO;
5960         if (pii.pool_id > (u64)U32_MAX) {
5961                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5962                         (unsigned long long)pii.pool_id, U32_MAX);
5963                 goto out_err;
5964         }
5965
5966         /*
5967          * The parent won't change (except when the clone is
5968          * flattened, already handled that).  So we only need to
5969          * record the parent spec we have not already done so.
5970          */
5971         if (!rbd_dev->parent_spec) {
5972                 parent_spec->pool_id = pii.pool_id;
5973                 if (pii.pool_ns && *pii.pool_ns) {
5974                         parent_spec->pool_ns = pii.pool_ns;
5975                         pii.pool_ns = NULL;
5976                 }
5977                 parent_spec->image_id = pii.image_id;
5978                 pii.image_id = NULL;
5979                 parent_spec->snap_id = pii.snap_id;
5980
5981                 rbd_dev->parent_spec = parent_spec;
5982                 parent_spec = NULL;     /* rbd_dev now owns this */
5983         }
5984
5985         /*
5986          * We always update the parent overlap.  If it's zero we issue
5987          * a warning, as we will proceed as if there was no parent.
5988          */
5989         if (!pii.overlap) {
5990                 if (parent_spec) {
5991                         /* refresh, careful to warn just once */
5992                         if (rbd_dev->parent_overlap)
5993                                 rbd_warn(rbd_dev,
5994                                     "clone now standalone (overlap became 0)");
5995                 } else {
5996                         /* initial probe */
5997                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5998                 }
5999         }
6000         rbd_dev->parent_overlap = pii.overlap;
6001
6002 out:
6003         ret = 0;
6004 out_err:
6005         kfree(pii.pool_ns);
6006         kfree(pii.image_id);
6007         rbd_spec_put(parent_spec);
6008         return ret;
6009 }
6010
6011 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
6012 {
6013         struct {
6014                 __le64 stripe_unit;
6015                 __le64 stripe_count;
6016         } __attribute__ ((packed)) striping_info_buf = { 0 };
6017         size_t size = sizeof (striping_info_buf);
6018         void *p;
6019         int ret;
6020
6021         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6022                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
6023                                 NULL, 0, &striping_info_buf, size);
6024         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6025         if (ret < 0)
6026                 return ret;
6027         if (ret < size)
6028                 return -ERANGE;
6029
6030         p = &striping_info_buf;
6031         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
6032         rbd_dev->header.stripe_count = ceph_decode_64(&p);
6033         return 0;
6034 }
6035
6036 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
6037 {
6038         __le64 data_pool_id;
6039         int ret;
6040
6041         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6042                                   &rbd_dev->header_oloc, "get_data_pool",
6043                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
6044         if (ret < 0)
6045                 return ret;
6046         if (ret < sizeof(data_pool_id))
6047                 return -EBADMSG;
6048
6049         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
6050         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
6051         return 0;
6052 }
6053
6054 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
6055 {
6056         CEPH_DEFINE_OID_ONSTACK(oid);
6057         size_t image_id_size;
6058         char *image_id;
6059         void *p;
6060         void *end;
6061         size_t size;
6062         void *reply_buf = NULL;
6063         size_t len = 0;
6064         char *image_name = NULL;
6065         int ret;
6066
6067         rbd_assert(!rbd_dev->spec->image_name);
6068
6069         len = strlen(rbd_dev->spec->image_id);
6070         image_id_size = sizeof (__le32) + len;
6071         image_id = kmalloc(image_id_size, GFP_KERNEL);
6072         if (!image_id)
6073                 return NULL;
6074
6075         p = image_id;
6076         end = image_id + image_id_size;
6077         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6078
6079         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6080         reply_buf = kmalloc(size, GFP_KERNEL);
6081         if (!reply_buf)
6082                 goto out;
6083
6084         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6085         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6086                                   "dir_get_name", image_id, image_id_size,
6087                                   reply_buf, size);
6088         if (ret < 0)
6089                 goto out;
6090         p = reply_buf;
6091         end = reply_buf + ret;
6092
6093         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6094         if (IS_ERR(image_name))
6095                 image_name = NULL;
6096         else
6097                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6098 out:
6099         kfree(reply_buf);
6100         kfree(image_id);
6101
6102         return image_name;
6103 }
6104
6105 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6106 {
6107         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6108         const char *snap_name;
6109         u32 which = 0;
6110
6111         /* Skip over names until we find the one we are looking for */
6112
6113         snap_name = rbd_dev->header.snap_names;
6114         while (which < snapc->num_snaps) {
6115                 if (!strcmp(name, snap_name))
6116                         return snapc->snaps[which];
6117                 snap_name += strlen(snap_name) + 1;
6118                 which++;
6119         }
6120         return CEPH_NOSNAP;
6121 }
6122
6123 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6124 {
6125         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6126         u32 which;
6127         bool found = false;
6128         u64 snap_id;
6129
6130         for (which = 0; !found && which < snapc->num_snaps; which++) {
6131                 const char *snap_name;
6132
6133                 snap_id = snapc->snaps[which];
6134                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6135                 if (IS_ERR(snap_name)) {
6136                         /* ignore no-longer existing snapshots */
6137                         if (PTR_ERR(snap_name) == -ENOENT)
6138                                 continue;
6139                         else
6140                                 break;
6141                 }
6142                 found = !strcmp(name, snap_name);
6143                 kfree(snap_name);
6144         }
6145         return found ? snap_id : CEPH_NOSNAP;
6146 }
6147
6148 /*
6149  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6150  * no snapshot by that name is found, or if an error occurs.
6151  */
6152 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6153 {
6154         if (rbd_dev->image_format == 1)
6155                 return rbd_v1_snap_id_by_name(rbd_dev, name);
6156
6157         return rbd_v2_snap_id_by_name(rbd_dev, name);
6158 }
6159
6160 /*
6161  * An image being mapped will have everything but the snap id.
6162  */
6163 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6164 {
6165         struct rbd_spec *spec = rbd_dev->spec;
6166
6167         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6168         rbd_assert(spec->image_id && spec->image_name);
6169         rbd_assert(spec->snap_name);
6170
6171         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6172                 u64 snap_id;
6173
6174                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6175                 if (snap_id == CEPH_NOSNAP)
6176                         return -ENOENT;
6177
6178                 spec->snap_id = snap_id;
6179         } else {
6180                 spec->snap_id = CEPH_NOSNAP;
6181         }
6182
6183         return 0;
6184 }
6185
6186 /*
6187  * A parent image will have all ids but none of the names.
6188  *
6189  * All names in an rbd spec are dynamically allocated.  It's OK if we
6190  * can't figure out the name for an image id.
6191  */
6192 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6193 {
6194         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6195         struct rbd_spec *spec = rbd_dev->spec;
6196         const char *pool_name;
6197         const char *image_name;
6198         const char *snap_name;
6199         int ret;
6200
6201         rbd_assert(spec->pool_id != CEPH_NOPOOL);
6202         rbd_assert(spec->image_id);
6203         rbd_assert(spec->snap_id != CEPH_NOSNAP);
6204
6205         /* Get the pool name; we have to make our own copy of this */
6206
6207         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6208         if (!pool_name) {
6209                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6210                 return -EIO;
6211         }
6212         pool_name = kstrdup(pool_name, GFP_KERNEL);
6213         if (!pool_name)
6214                 return -ENOMEM;
6215
6216         /* Fetch the image name; tolerate failure here */
6217
6218         image_name = rbd_dev_image_name(rbd_dev);
6219         if (!image_name)
6220                 rbd_warn(rbd_dev, "unable to get image name");
6221
6222         /* Fetch the snapshot name */
6223
6224         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6225         if (IS_ERR(snap_name)) {
6226                 ret = PTR_ERR(snap_name);
6227                 goto out_err;
6228         }
6229
6230         spec->pool_name = pool_name;
6231         spec->image_name = image_name;
6232         spec->snap_name = snap_name;
6233
6234         return 0;
6235
6236 out_err:
6237         kfree(image_name);
6238         kfree(pool_name);
6239         return ret;
6240 }
6241
6242 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6243 {
6244         size_t size;
6245         int ret;
6246         void *reply_buf;
6247         void *p;
6248         void *end;
6249         u64 seq;
6250         u32 snap_count;
6251         struct ceph_snap_context *snapc;
6252         u32 i;
6253
6254         /*
6255          * We'll need room for the seq value (maximum snapshot id),
6256          * snapshot count, and array of that many snapshot ids.
6257          * For now we have a fixed upper limit on the number we're
6258          * prepared to receive.
6259          */
6260         size = sizeof (__le64) + sizeof (__le32) +
6261                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
6262         reply_buf = kzalloc(size, GFP_KERNEL);
6263         if (!reply_buf)
6264                 return -ENOMEM;
6265
6266         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6267                                   &rbd_dev->header_oloc, "get_snapcontext",
6268                                   NULL, 0, reply_buf, size);
6269         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6270         if (ret < 0)
6271                 goto out;
6272
6273         p = reply_buf;
6274         end = reply_buf + ret;
6275         ret = -ERANGE;
6276         ceph_decode_64_safe(&p, end, seq, out);
6277         ceph_decode_32_safe(&p, end, snap_count, out);
6278
6279         /*
6280          * Make sure the reported number of snapshot ids wouldn't go
6281          * beyond the end of our buffer.  But before checking that,
6282          * make sure the computed size of the snapshot context we
6283          * allocate is representable in a size_t.
6284          */
6285         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6286                                  / sizeof (u64)) {
6287                 ret = -EINVAL;
6288                 goto out;
6289         }
6290         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6291                 goto out;
6292         ret = 0;
6293
6294         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6295         if (!snapc) {
6296                 ret = -ENOMEM;
6297                 goto out;
6298         }
6299         snapc->seq = seq;
6300         for (i = 0; i < snap_count; i++)
6301                 snapc->snaps[i] = ceph_decode_64(&p);
6302
6303         ceph_put_snap_context(rbd_dev->header.snapc);
6304         rbd_dev->header.snapc = snapc;
6305
6306         dout("  snap context seq = %llu, snap_count = %u\n",
6307                 (unsigned long long)seq, (unsigned int)snap_count);
6308 out:
6309         kfree(reply_buf);
6310
6311         return ret;
6312 }
6313
6314 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6315                                         u64 snap_id)
6316 {
6317         size_t size;
6318         void *reply_buf;
6319         __le64 snapid;
6320         int ret;
6321         void *p;
6322         void *end;
6323         char *snap_name;
6324
6325         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6326         reply_buf = kmalloc(size, GFP_KERNEL);
6327         if (!reply_buf)
6328                 return ERR_PTR(-ENOMEM);
6329
6330         snapid = cpu_to_le64(snap_id);
6331         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6332                                   &rbd_dev->header_oloc, "get_snapshot_name",
6333                                   &snapid, sizeof(snapid), reply_buf, size);
6334         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6335         if (ret < 0) {
6336                 snap_name = ERR_PTR(ret);
6337                 goto out;
6338         }
6339
6340         p = reply_buf;
6341         end = reply_buf + ret;
6342         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6343         if (IS_ERR(snap_name))
6344                 goto out;
6345
6346         dout("  snap_id 0x%016llx snap_name = %s\n",
6347                 (unsigned long long)snap_id, snap_name);
6348 out:
6349         kfree(reply_buf);
6350
6351         return snap_name;
6352 }
6353
6354 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6355 {
6356         bool first_time = rbd_dev->header.object_prefix == NULL;
6357         int ret;
6358
6359         ret = rbd_dev_v2_image_size(rbd_dev);
6360         if (ret)
6361                 return ret;
6362
6363         if (first_time) {
6364                 ret = rbd_dev_v2_header_onetime(rbd_dev);
6365                 if (ret)
6366                         return ret;
6367         }
6368
6369         ret = rbd_dev_v2_snap_context(rbd_dev);
6370         if (ret && first_time) {
6371                 kfree(rbd_dev->header.object_prefix);
6372                 rbd_dev->header.object_prefix = NULL;
6373         }
6374
6375         return ret;
6376 }
6377
6378 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6379 {
6380         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6381
6382         if (rbd_dev->image_format == 1)
6383                 return rbd_dev_v1_header_info(rbd_dev);
6384
6385         return rbd_dev_v2_header_info(rbd_dev);
6386 }
6387
6388 /*
6389  * Skips over white space at *buf, and updates *buf to point to the
6390  * first found non-space character (if any). Returns the length of
6391  * the token (string of non-white space characters) found.  Note
6392  * that *buf must be terminated with '\0'.
6393  */
6394 static inline size_t next_token(const char **buf)
6395 {
6396         /*
6397         * These are the characters that produce nonzero for
6398         * isspace() in the "C" and "POSIX" locales.
6399         */
6400         const char *spaces = " \f\n\r\t\v";
6401
6402         *buf += strspn(*buf, spaces);   /* Find start of token */
6403
6404         return strcspn(*buf, spaces);   /* Return token length */
6405 }
6406
6407 /*
6408  * Finds the next token in *buf, dynamically allocates a buffer big
6409  * enough to hold a copy of it, and copies the token into the new
6410  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6411  * that a duplicate buffer is created even for a zero-length token.
6412  *
6413  * Returns a pointer to the newly-allocated duplicate, or a null
6414  * pointer if memory for the duplicate was not available.  If
6415  * the lenp argument is a non-null pointer, the length of the token
6416  * (not including the '\0') is returned in *lenp.
6417  *
6418  * If successful, the *buf pointer will be updated to point beyond
6419  * the end of the found token.
6420  *
6421  * Note: uses GFP_KERNEL for allocation.
6422  */
6423 static inline char *dup_token(const char **buf, size_t *lenp)
6424 {
6425         char *dup;
6426         size_t len;
6427
6428         len = next_token(buf);
6429         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6430         if (!dup)
6431                 return NULL;
6432         *(dup + len) = '\0';
6433         *buf += len;
6434
6435         if (lenp)
6436                 *lenp = len;
6437
6438         return dup;
6439 }
6440
6441 /*
6442  * Parse the options provided for an "rbd add" (i.e., rbd image
6443  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6444  * and the data written is passed here via a NUL-terminated buffer.
6445  * Returns 0 if successful or an error code otherwise.
6446  *
6447  * The information extracted from these options is recorded in
6448  * the other parameters which return dynamically-allocated
6449  * structures:
6450  *  ceph_opts
6451  *      The address of a pointer that will refer to a ceph options
6452  *      structure.  Caller must release the returned pointer using
6453  *      ceph_destroy_options() when it is no longer needed.
6454  *  rbd_opts
6455  *      Address of an rbd options pointer.  Fully initialized by
6456  *      this function; caller must release with kfree().
6457  *  spec
6458  *      Address of an rbd image specification pointer.  Fully
6459  *      initialized by this function based on parsed options.
6460  *      Caller must release with rbd_spec_put().
6461  *
6462  * The options passed take this form:
6463  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6464  * where:
6465  *  <mon_addrs>
6466  *      A comma-separated list of one or more monitor addresses.
6467  *      A monitor address is an ip address, optionally followed
6468  *      by a port number (separated by a colon).
6469  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6470  *  <options>
6471  *      A comma-separated list of ceph and/or rbd options.
6472  *  <pool_name>
6473  *      The name of the rados pool containing the rbd image.
6474  *  <image_name>
6475  *      The name of the image in that pool to map.
6476  *  <snap_id>
6477  *      An optional snapshot id.  If provided, the mapping will
6478  *      present data from the image at the time that snapshot was
6479  *      created.  The image head is used if no snapshot id is
6480  *      provided.  Snapshot mappings are always read-only.
6481  */
6482 static int rbd_add_parse_args(const char *buf,
6483                                 struct ceph_options **ceph_opts,
6484                                 struct rbd_options **opts,
6485                                 struct rbd_spec **rbd_spec)
6486 {
6487         size_t len;
6488         char *options;
6489         const char *mon_addrs;
6490         char *snap_name;
6491         size_t mon_addrs_size;
6492         struct parse_rbd_opts_ctx pctx = { 0 };
6493         struct ceph_options *copts;
6494         int ret;
6495
6496         /* The first four tokens are required */
6497
6498         len = next_token(&buf);
6499         if (!len) {
6500                 rbd_warn(NULL, "no monitor address(es) provided");
6501                 return -EINVAL;
6502         }
6503         mon_addrs = buf;
6504         mon_addrs_size = len + 1;
6505         buf += len;
6506
6507         ret = -EINVAL;
6508         options = dup_token(&buf, NULL);
6509         if (!options)
6510                 return -ENOMEM;
6511         if (!*options) {
6512                 rbd_warn(NULL, "no options provided");
6513                 goto out_err;
6514         }
6515
6516         pctx.spec = rbd_spec_alloc();
6517         if (!pctx.spec)
6518                 goto out_mem;
6519
6520         pctx.spec->pool_name = dup_token(&buf, NULL);
6521         if (!pctx.spec->pool_name)
6522                 goto out_mem;
6523         if (!*pctx.spec->pool_name) {
6524                 rbd_warn(NULL, "no pool name provided");
6525                 goto out_err;
6526         }
6527
6528         pctx.spec->image_name = dup_token(&buf, NULL);
6529         if (!pctx.spec->image_name)
6530                 goto out_mem;
6531         if (!*pctx.spec->image_name) {
6532                 rbd_warn(NULL, "no image name provided");
6533                 goto out_err;
6534         }
6535
6536         /*
6537          * Snapshot name is optional; default is to use "-"
6538          * (indicating the head/no snapshot).
6539          */
6540         len = next_token(&buf);
6541         if (!len) {
6542                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6543                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6544         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6545                 ret = -ENAMETOOLONG;
6546                 goto out_err;
6547         }
6548         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6549         if (!snap_name)
6550                 goto out_mem;
6551         *(snap_name + len) = '\0';
6552         pctx.spec->snap_name = snap_name;
6553
6554         /* Initialize all rbd options to the defaults */
6555
6556         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6557         if (!pctx.opts)
6558                 goto out_mem;
6559
6560         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6561         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6562         pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6563         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6564         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6565         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6566         pctx.opts->trim = RBD_TRIM_DEFAULT;
6567
6568         copts = ceph_parse_options(options, mon_addrs,
6569                                    mon_addrs + mon_addrs_size - 1,
6570                                    parse_rbd_opts_token, &pctx);
6571         if (IS_ERR(copts)) {
6572                 ret = PTR_ERR(copts);
6573                 goto out_err;
6574         }
6575         kfree(options);
6576
6577         *ceph_opts = copts;
6578         *opts = pctx.opts;
6579         *rbd_spec = pctx.spec;
6580
6581         return 0;
6582 out_mem:
6583         ret = -ENOMEM;
6584 out_err:
6585         kfree(pctx.opts);
6586         rbd_spec_put(pctx.spec);
6587         kfree(options);
6588
6589         return ret;
6590 }
6591
6592 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6593 {
6594         down_write(&rbd_dev->lock_rwsem);
6595         if (__rbd_is_lock_owner(rbd_dev))
6596                 __rbd_release_lock(rbd_dev);
6597         up_write(&rbd_dev->lock_rwsem);
6598 }
6599
6600 /*
6601  * If the wait is interrupted, an error is returned even if the lock
6602  * was successfully acquired.  rbd_dev_image_unlock() will release it
6603  * if needed.
6604  */
6605 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6606 {
6607         long ret;
6608
6609         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6610                 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6611                         return 0;
6612
6613                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6614                 return -EINVAL;
6615         }
6616
6617         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6618                 return 0;
6619
6620         rbd_assert(!rbd_is_lock_owner(rbd_dev));
6621         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6622         ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6623                             ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6624         if (ret > 0)
6625                 ret = rbd_dev->acquire_err;
6626         else if (!ret)
6627                 ret = -ETIMEDOUT;
6628
6629         if (ret) {
6630                 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6631                 return ret;
6632         }
6633
6634         /*
6635          * The lock may have been released by now, unless automatic lock
6636          * transitions are disabled.
6637          */
6638         rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6639         return 0;
6640 }
6641
6642 /*
6643  * An rbd format 2 image has a unique identifier, distinct from the
6644  * name given to it by the user.  Internally, that identifier is
6645  * what's used to specify the names of objects related to the image.
6646  *
6647  * A special "rbd id" object is used to map an rbd image name to its
6648  * id.  If that object doesn't exist, then there is no v2 rbd image
6649  * with the supplied name.
6650  *
6651  * This function will record the given rbd_dev's image_id field if
6652  * it can be determined, and in that case will return 0.  If any
6653  * errors occur a negative errno will be returned and the rbd_dev's
6654  * image_id field will be unchanged (and should be NULL).
6655  */
6656 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6657 {
6658         int ret;
6659         size_t size;
6660         CEPH_DEFINE_OID_ONSTACK(oid);
6661         void *response;
6662         char *image_id;
6663
6664         /*
6665          * When probing a parent image, the image id is already
6666          * known (and the image name likely is not).  There's no
6667          * need to fetch the image id again in this case.  We
6668          * do still need to set the image format though.
6669          */
6670         if (rbd_dev->spec->image_id) {
6671                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6672
6673                 return 0;
6674         }
6675
6676         /*
6677          * First, see if the format 2 image id file exists, and if
6678          * so, get the image's persistent id from it.
6679          */
6680         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6681                                rbd_dev->spec->image_name);
6682         if (ret)
6683                 return ret;
6684
6685         dout("rbd id object name is %s\n", oid.name);
6686
6687         /* Response will be an encoded string, which includes a length */
6688
6689         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6690         response = kzalloc(size, GFP_NOIO);
6691         if (!response) {
6692                 ret = -ENOMEM;
6693                 goto out;
6694         }
6695
6696         /* If it doesn't exist we'll assume it's a format 1 image */
6697
6698         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6699                                   "get_id", NULL, 0,
6700                                   response, RBD_IMAGE_ID_LEN_MAX);
6701         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6702         if (ret == -ENOENT) {
6703                 image_id = kstrdup("", GFP_KERNEL);
6704                 ret = image_id ? 0 : -ENOMEM;
6705                 if (!ret)
6706                         rbd_dev->image_format = 1;
6707         } else if (ret >= 0) {
6708                 void *p = response;
6709
6710                 image_id = ceph_extract_encoded_string(&p, p + ret,
6711                                                 NULL, GFP_NOIO);
6712                 ret = PTR_ERR_OR_ZERO(image_id);
6713                 if (!ret)
6714                         rbd_dev->image_format = 2;
6715         }
6716
6717         if (!ret) {
6718                 rbd_dev->spec->image_id = image_id;
6719                 dout("image_id is %s\n", image_id);
6720         }
6721 out:
6722         kfree(response);
6723         ceph_oid_destroy(&oid);
6724         return ret;
6725 }
6726
6727 /*
6728  * Undo whatever state changes are made by v1 or v2 header info
6729  * call.
6730  */
6731 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6732 {
6733         struct rbd_image_header *header;
6734
6735         rbd_dev_parent_put(rbd_dev);
6736         rbd_object_map_free(rbd_dev);
6737         rbd_dev_mapping_clear(rbd_dev);
6738
6739         /* Free dynamic fields from the header, then zero it out */
6740
6741         header = &rbd_dev->header;
6742         ceph_put_snap_context(header->snapc);
6743         kfree(header->snap_sizes);
6744         kfree(header->snap_names);
6745         kfree(header->object_prefix);
6746         memset(header, 0, sizeof (*header));
6747 }
6748
6749 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6750 {
6751         int ret;
6752
6753         ret = rbd_dev_v2_object_prefix(rbd_dev);
6754         if (ret)
6755                 goto out_err;
6756
6757         /*
6758          * Get the and check features for the image.  Currently the
6759          * features are assumed to never change.
6760          */
6761         ret = rbd_dev_v2_features(rbd_dev);
6762         if (ret)
6763                 goto out_err;
6764
6765         /* If the image supports fancy striping, get its parameters */
6766
6767         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6768                 ret = rbd_dev_v2_striping_info(rbd_dev);
6769                 if (ret < 0)
6770                         goto out_err;
6771         }
6772
6773         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6774                 ret = rbd_dev_v2_data_pool(rbd_dev);
6775                 if (ret)
6776                         goto out_err;
6777         }
6778
6779         rbd_init_layout(rbd_dev);
6780         return 0;
6781
6782 out_err:
6783         rbd_dev->header.features = 0;
6784         kfree(rbd_dev->header.object_prefix);
6785         rbd_dev->header.object_prefix = NULL;
6786         return ret;
6787 }
6788
6789 /*
6790  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6791  * rbd_dev_image_probe() recursion depth, which means it's also the
6792  * length of the already discovered part of the parent chain.
6793  */
6794 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6795 {
6796         struct rbd_device *parent = NULL;
6797         int ret;
6798
6799         if (!rbd_dev->parent_spec)
6800                 return 0;
6801
6802         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6803                 pr_info("parent chain is too long (%d)\n", depth);
6804                 ret = -EINVAL;
6805                 goto out_err;
6806         }
6807
6808         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6809         if (!parent) {
6810                 ret = -ENOMEM;
6811                 goto out_err;
6812         }
6813
6814         /*
6815          * Images related by parent/child relationships always share
6816          * rbd_client and spec/parent_spec, so bump their refcounts.
6817          */
6818         __rbd_get_client(rbd_dev->rbd_client);
6819         rbd_spec_get(rbd_dev->parent_spec);
6820
6821         ret = rbd_dev_image_probe(parent, depth);
6822         if (ret < 0)
6823                 goto out_err;
6824
6825         rbd_dev->parent = parent;
6826         atomic_set(&rbd_dev->parent_ref, 1);
6827         return 0;
6828
6829 out_err:
6830         rbd_dev_unparent(rbd_dev);
6831         rbd_dev_destroy(parent);
6832         return ret;
6833 }
6834
6835 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6836 {
6837         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6838         rbd_free_disk(rbd_dev);
6839         if (!single_major)
6840                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6841 }
6842
6843 /*
6844  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6845  * upon return.
6846  */
6847 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6848 {
6849         int ret;
6850
6851         /* Record our major and minor device numbers. */
6852
6853         if (!single_major) {
6854                 ret = register_blkdev(0, rbd_dev->name);
6855                 if (ret < 0)
6856                         goto err_out_unlock;
6857
6858                 rbd_dev->major = ret;
6859                 rbd_dev->minor = 0;
6860         } else {
6861                 rbd_dev->major = rbd_major;
6862                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6863         }
6864
6865         /* Set up the blkdev mapping. */
6866
6867         ret = rbd_init_disk(rbd_dev);
6868         if (ret)
6869                 goto err_out_blkdev;
6870
6871         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6872         set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
6873
6874         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6875         if (ret)
6876                 goto err_out_disk;
6877
6878         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6879         up_write(&rbd_dev->header_rwsem);
6880         return 0;
6881
6882 err_out_disk:
6883         rbd_free_disk(rbd_dev);
6884 err_out_blkdev:
6885         if (!single_major)
6886                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6887 err_out_unlock:
6888         up_write(&rbd_dev->header_rwsem);
6889         return ret;
6890 }
6891
6892 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6893 {
6894         struct rbd_spec *spec = rbd_dev->spec;
6895         int ret;
6896
6897         /* Record the header object name for this rbd image. */
6898
6899         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6900         if (rbd_dev->image_format == 1)
6901                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6902                                        spec->image_name, RBD_SUFFIX);
6903         else
6904                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6905                                        RBD_HEADER_PREFIX, spec->image_id);
6906
6907         return ret;
6908 }
6909
6910 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6911 {
6912         rbd_dev_unprobe(rbd_dev);
6913         if (rbd_dev->opts)
6914                 rbd_unregister_watch(rbd_dev);
6915         rbd_dev->image_format = 0;
6916         kfree(rbd_dev->spec->image_id);
6917         rbd_dev->spec->image_id = NULL;
6918 }
6919
6920 /*
6921  * Probe for the existence of the header object for the given rbd
6922  * device.  If this image is the one being mapped (i.e., not a
6923  * parent), initiate a watch on its header object before using that
6924  * object to get detailed information about the rbd image.
6925  */
6926 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6927 {
6928         int ret;
6929
6930         /*
6931          * Get the id from the image id object.  Unless there's an
6932          * error, rbd_dev->spec->image_id will be filled in with
6933          * a dynamically-allocated string, and rbd_dev->image_format
6934          * will be set to either 1 or 2.
6935          */
6936         ret = rbd_dev_image_id(rbd_dev);
6937         if (ret)
6938                 return ret;
6939
6940         ret = rbd_dev_header_name(rbd_dev);
6941         if (ret)
6942                 goto err_out_format;
6943
6944         if (!depth) {
6945                 ret = rbd_register_watch(rbd_dev);
6946                 if (ret) {
6947                         if (ret == -ENOENT)
6948                                 pr_info("image %s/%s%s%s does not exist\n",
6949                                         rbd_dev->spec->pool_name,
6950                                         rbd_dev->spec->pool_ns ?: "",
6951                                         rbd_dev->spec->pool_ns ? "/" : "",
6952                                         rbd_dev->spec->image_name);
6953                         goto err_out_format;
6954                 }
6955         }
6956
6957         ret = rbd_dev_header_info(rbd_dev);
6958         if (ret)
6959                 goto err_out_watch;
6960
6961         /*
6962          * If this image is the one being mapped, we have pool name and
6963          * id, image name and id, and snap name - need to fill snap id.
6964          * Otherwise this is a parent image, identified by pool, image
6965          * and snap ids - need to fill in names for those ids.
6966          */
6967         if (!depth)
6968                 ret = rbd_spec_fill_snap_id(rbd_dev);
6969         else
6970                 ret = rbd_spec_fill_names(rbd_dev);
6971         if (ret) {
6972                 if (ret == -ENOENT)
6973                         pr_info("snap %s/%s%s%s@%s does not exist\n",
6974                                 rbd_dev->spec->pool_name,
6975                                 rbd_dev->spec->pool_ns ?: "",
6976                                 rbd_dev->spec->pool_ns ? "/" : "",
6977                                 rbd_dev->spec->image_name,
6978                                 rbd_dev->spec->snap_name);
6979                 goto err_out_probe;
6980         }
6981
6982         ret = rbd_dev_mapping_set(rbd_dev);
6983         if (ret)
6984                 goto err_out_probe;
6985
6986         if (rbd_dev->spec->snap_id != CEPH_NOSNAP &&
6987             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6988                 ret = rbd_object_map_load(rbd_dev);
6989                 if (ret)
6990                         goto err_out_probe;
6991         }
6992
6993         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6994                 ret = rbd_dev_v2_parent_info(rbd_dev);
6995                 if (ret)
6996                         goto err_out_probe;
6997         }
6998
6999         ret = rbd_dev_probe_parent(rbd_dev, depth);
7000         if (ret)
7001                 goto err_out_probe;
7002
7003         dout("discovered format %u image, header name is %s\n",
7004                 rbd_dev->image_format, rbd_dev->header_oid.name);
7005         return 0;
7006
7007 err_out_probe:
7008         rbd_dev_unprobe(rbd_dev);
7009 err_out_watch:
7010         if (!depth)
7011                 rbd_unregister_watch(rbd_dev);
7012 err_out_format:
7013         rbd_dev->image_format = 0;
7014         kfree(rbd_dev->spec->image_id);
7015         rbd_dev->spec->image_id = NULL;
7016         return ret;
7017 }
7018
7019 static ssize_t do_rbd_add(struct bus_type *bus,
7020                           const char *buf,
7021                           size_t count)
7022 {
7023         struct rbd_device *rbd_dev = NULL;
7024         struct ceph_options *ceph_opts = NULL;
7025         struct rbd_options *rbd_opts = NULL;
7026         struct rbd_spec *spec = NULL;
7027         struct rbd_client *rbdc;
7028         int rc;
7029
7030         if (!try_module_get(THIS_MODULE))
7031                 return -ENODEV;
7032
7033         /* parse add command */
7034         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7035         if (rc < 0)
7036                 goto out;
7037
7038         rbdc = rbd_get_client(ceph_opts);
7039         if (IS_ERR(rbdc)) {
7040                 rc = PTR_ERR(rbdc);
7041                 goto err_out_args;
7042         }
7043
7044         /* pick the pool */
7045         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7046         if (rc < 0) {
7047                 if (rc == -ENOENT)
7048                         pr_info("pool %s does not exist\n", spec->pool_name);
7049                 goto err_out_client;
7050         }
7051         spec->pool_id = (u64)rc;
7052
7053         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7054         if (!rbd_dev) {
7055                 rc = -ENOMEM;
7056                 goto err_out_client;
7057         }
7058         rbdc = NULL;            /* rbd_dev now owns this */
7059         spec = NULL;            /* rbd_dev now owns this */
7060         rbd_opts = NULL;        /* rbd_dev now owns this */
7061
7062         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7063         if (!rbd_dev->config_info) {
7064                 rc = -ENOMEM;
7065                 goto err_out_rbd_dev;
7066         }
7067
7068         down_write(&rbd_dev->header_rwsem);
7069         rc = rbd_dev_image_probe(rbd_dev, 0);
7070         if (rc < 0) {
7071                 up_write(&rbd_dev->header_rwsem);
7072                 goto err_out_rbd_dev;
7073         }
7074
7075         /* If we are mapping a snapshot it must be marked read-only */
7076         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
7077                 rbd_dev->opts->read_only = true;
7078
7079         if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7080                 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7081                          rbd_dev->layout.object_size);
7082                 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7083         }
7084
7085         rc = rbd_dev_device_setup(rbd_dev);
7086         if (rc)
7087                 goto err_out_image_probe;
7088
7089         rc = rbd_add_acquire_lock(rbd_dev);
7090         if (rc)
7091                 goto err_out_image_lock;
7092
7093         /* Everything's ready.  Announce the disk to the world. */
7094
7095         rc = device_add(&rbd_dev->dev);
7096         if (rc)
7097                 goto err_out_image_lock;
7098
7099         add_disk(rbd_dev->disk);
7100         /* see rbd_init_disk() */
7101         blk_put_queue(rbd_dev->disk->queue);
7102
7103         spin_lock(&rbd_dev_list_lock);
7104         list_add_tail(&rbd_dev->node, &rbd_dev_list);
7105         spin_unlock(&rbd_dev_list_lock);
7106
7107         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7108                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7109                 rbd_dev->header.features);
7110         rc = count;
7111 out:
7112         module_put(THIS_MODULE);
7113         return rc;
7114
7115 err_out_image_lock:
7116         rbd_dev_image_unlock(rbd_dev);
7117         rbd_dev_device_release(rbd_dev);
7118 err_out_image_probe:
7119         rbd_dev_image_release(rbd_dev);
7120 err_out_rbd_dev:
7121         rbd_dev_destroy(rbd_dev);
7122 err_out_client:
7123         rbd_put_client(rbdc);
7124 err_out_args:
7125         rbd_spec_put(spec);
7126         kfree(rbd_opts);
7127         goto out;
7128 }
7129
7130 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7131 {
7132         if (single_major)
7133                 return -EINVAL;
7134
7135         return do_rbd_add(bus, buf, count);
7136 }
7137
7138 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7139                                       size_t count)
7140 {
7141         return do_rbd_add(bus, buf, count);
7142 }
7143
7144 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7145 {
7146         while (rbd_dev->parent) {
7147                 struct rbd_device *first = rbd_dev;
7148                 struct rbd_device *second = first->parent;
7149                 struct rbd_device *third;
7150
7151                 /*
7152                  * Follow to the parent with no grandparent and
7153                  * remove it.
7154                  */
7155                 while (second && (third = second->parent)) {
7156                         first = second;
7157                         second = third;
7158                 }
7159                 rbd_assert(second);
7160                 rbd_dev_image_release(second);
7161                 rbd_dev_destroy(second);
7162                 first->parent = NULL;
7163                 first->parent_overlap = 0;
7164
7165                 rbd_assert(first->parent_spec);
7166                 rbd_spec_put(first->parent_spec);
7167                 first->parent_spec = NULL;
7168         }
7169 }
7170
7171 static ssize_t do_rbd_remove(struct bus_type *bus,
7172                              const char *buf,
7173                              size_t count)
7174 {
7175         struct rbd_device *rbd_dev = NULL;
7176         struct list_head *tmp;
7177         int dev_id;
7178         char opt_buf[6];
7179         bool force = false;
7180         int ret;
7181
7182         dev_id = -1;
7183         opt_buf[0] = '\0';
7184         sscanf(buf, "%d %5s", &dev_id, opt_buf);
7185         if (dev_id < 0) {
7186                 pr_err("dev_id out of range\n");
7187                 return -EINVAL;
7188         }
7189         if (opt_buf[0] != '\0') {
7190                 if (!strcmp(opt_buf, "force")) {
7191                         force = true;
7192                 } else {
7193                         pr_err("bad remove option at '%s'\n", opt_buf);
7194                         return -EINVAL;
7195                 }
7196         }
7197
7198         ret = -ENOENT;
7199         spin_lock(&rbd_dev_list_lock);
7200         list_for_each(tmp, &rbd_dev_list) {
7201                 rbd_dev = list_entry(tmp, struct rbd_device, node);
7202                 if (rbd_dev->dev_id == dev_id) {
7203                         ret = 0;
7204                         break;
7205                 }
7206         }
7207         if (!ret) {
7208                 spin_lock_irq(&rbd_dev->lock);
7209                 if (rbd_dev->open_count && !force)
7210                         ret = -EBUSY;
7211                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7212                                           &rbd_dev->flags))
7213                         ret = -EINPROGRESS;
7214                 spin_unlock_irq(&rbd_dev->lock);
7215         }
7216         spin_unlock(&rbd_dev_list_lock);
7217         if (ret)
7218                 return ret;
7219
7220         if (force) {
7221                 /*
7222                  * Prevent new IO from being queued and wait for existing
7223                  * IO to complete/fail.
7224                  */
7225                 blk_mq_freeze_queue(rbd_dev->disk->queue);
7226                 blk_set_queue_dying(rbd_dev->disk->queue);
7227         }
7228
7229         del_gendisk(rbd_dev->disk);
7230         spin_lock(&rbd_dev_list_lock);
7231         list_del_init(&rbd_dev->node);
7232         spin_unlock(&rbd_dev_list_lock);
7233         device_del(&rbd_dev->dev);
7234
7235         rbd_dev_image_unlock(rbd_dev);
7236         rbd_dev_device_release(rbd_dev);
7237         rbd_dev_image_release(rbd_dev);
7238         rbd_dev_destroy(rbd_dev);
7239         return count;
7240 }
7241
7242 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7243 {
7244         if (single_major)
7245                 return -EINVAL;
7246
7247         return do_rbd_remove(bus, buf, count);
7248 }
7249
7250 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7251                                          size_t count)
7252 {
7253         return do_rbd_remove(bus, buf, count);
7254 }
7255
7256 /*
7257  * create control files in sysfs
7258  * /sys/bus/rbd/...
7259  */
7260 static int __init rbd_sysfs_init(void)
7261 {
7262         int ret;
7263
7264         ret = device_register(&rbd_root_dev);
7265         if (ret < 0)
7266                 return ret;
7267
7268         ret = bus_register(&rbd_bus_type);
7269         if (ret < 0)
7270                 device_unregister(&rbd_root_dev);
7271
7272         return ret;
7273 }
7274
7275 static void __exit rbd_sysfs_cleanup(void)
7276 {
7277         bus_unregister(&rbd_bus_type);
7278         device_unregister(&rbd_root_dev);
7279 }
7280
7281 static int __init rbd_slab_init(void)
7282 {
7283         rbd_assert(!rbd_img_request_cache);
7284         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7285         if (!rbd_img_request_cache)
7286                 return -ENOMEM;
7287
7288         rbd_assert(!rbd_obj_request_cache);
7289         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7290         if (!rbd_obj_request_cache)
7291                 goto out_err;
7292
7293         return 0;
7294
7295 out_err:
7296         kmem_cache_destroy(rbd_img_request_cache);
7297         rbd_img_request_cache = NULL;
7298         return -ENOMEM;
7299 }
7300
7301 static void rbd_slab_exit(void)
7302 {
7303         rbd_assert(rbd_obj_request_cache);
7304         kmem_cache_destroy(rbd_obj_request_cache);
7305         rbd_obj_request_cache = NULL;
7306
7307         rbd_assert(rbd_img_request_cache);
7308         kmem_cache_destroy(rbd_img_request_cache);
7309         rbd_img_request_cache = NULL;
7310 }
7311
7312 static int __init rbd_init(void)
7313 {
7314         int rc;
7315
7316         if (!libceph_compatible(NULL)) {
7317                 rbd_warn(NULL, "libceph incompatibility (quitting)");
7318                 return -EINVAL;
7319         }
7320
7321         rc = rbd_slab_init();
7322         if (rc)
7323                 return rc;
7324
7325         /*
7326          * The number of active work items is limited by the number of
7327          * rbd devices * queue depth, so leave @max_active at default.
7328          */
7329         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7330         if (!rbd_wq) {
7331                 rc = -ENOMEM;
7332                 goto err_out_slab;
7333         }
7334
7335         if (single_major) {
7336                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7337                 if (rbd_major < 0) {
7338                         rc = rbd_major;
7339                         goto err_out_wq;
7340                 }
7341         }
7342
7343         rc = rbd_sysfs_init();
7344         if (rc)
7345                 goto err_out_blkdev;
7346
7347         if (single_major)
7348                 pr_info("loaded (major %d)\n", rbd_major);
7349         else
7350                 pr_info("loaded\n");
7351
7352         return 0;
7353
7354 err_out_blkdev:
7355         if (single_major)
7356                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7357 err_out_wq:
7358         destroy_workqueue(rbd_wq);
7359 err_out_slab:
7360         rbd_slab_exit();
7361         return rc;
7362 }
7363
7364 static void __exit rbd_exit(void)
7365 {
7366         ida_destroy(&rbd_dev_id_ida);
7367         rbd_sysfs_cleanup();
7368         if (single_major)
7369                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7370         destroy_workqueue(rbd_wq);
7371         rbd_slab_exit();
7372 }
7373
7374 module_init(rbd_init);
7375 module_exit(rbd_exit);
7376
7377 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7378 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7379 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7380 /* following authorship retained from original osdblk.c */
7381 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7382
7383 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7384 MODULE_LICENSE("GPL");