74088d8dbaf357b9ecf46b02051dabf96e2d0a10
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
119 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
120
121 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
122                                  RBD_FEATURE_STRIPINGV2 |       \
123                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
124                                  RBD_FEATURE_DATA_POOL |        \
125                                  RBD_FEATURE_OPERATIONS)
126
127 /* Features supported by this (client software) implementation. */
128
129 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
130
131 /*
132  * An RBD device name will be "rbd#", where the "rbd" comes from
133  * RBD_DRV_NAME above, and # is a unique integer identifier.
134  */
135 #define DEV_NAME_LEN            32
136
137 /*
138  * block device image metadata (in-memory version)
139  */
140 struct rbd_image_header {
141         /* These six fields never change for a given rbd image */
142         char *object_prefix;
143         __u8 obj_order;
144         u64 stripe_unit;
145         u64 stripe_count;
146         s64 data_pool_id;
147         u64 features;           /* Might be changeable someday? */
148
149         /* The remaining fields need to be updated occasionally */
150         u64 image_size;
151         struct ceph_snap_context *snapc;
152         char *snap_names;       /* format 1 only */
153         u64 *snap_sizes;        /* format 1 only */
154 };
155
156 /*
157  * An rbd image specification.
158  *
159  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
160  * identify an image.  Each rbd_dev structure includes a pointer to
161  * an rbd_spec structure that encapsulates this identity.
162  *
163  * Each of the id's in an rbd_spec has an associated name.  For a
164  * user-mapped image, the names are supplied and the id's associated
165  * with them are looked up.  For a layered image, a parent image is
166  * defined by the tuple, and the names are looked up.
167  *
168  * An rbd_dev structure contains a parent_spec pointer which is
169  * non-null if the image it represents is a child in a layered
170  * image.  This pointer will refer to the rbd_spec structure used
171  * by the parent rbd_dev for its own identity (i.e., the structure
172  * is shared between the parent and child).
173  *
174  * Since these structures are populated once, during the discovery
175  * phase of image construction, they are effectively immutable so
176  * we make no effort to synchronize access to them.
177  *
178  * Note that code herein does not assume the image name is known (it
179  * could be a null pointer).
180  */
181 struct rbd_spec {
182         u64             pool_id;
183         const char      *pool_name;
184         const char      *pool_ns;       /* NULL if default, never "" */
185
186         const char      *image_id;
187         const char      *image_name;
188
189         u64             snap_id;
190         const char      *snap_name;
191
192         struct kref     kref;
193 };
194
195 /*
196  * an instance of the client.  multiple devices may share an rbd client.
197  */
198 struct rbd_client {
199         struct ceph_client      *client;
200         struct kref             kref;
201         struct list_head        node;
202 };
203
204 struct rbd_img_request;
205
206 enum obj_request_type {
207         OBJ_REQUEST_NODATA = 1,
208         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
209         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
210         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
211 };
212
213 enum obj_operation_type {
214         OBJ_OP_READ = 1,
215         OBJ_OP_WRITE,
216         OBJ_OP_DISCARD,
217 };
218
219 /*
220  * Writes go through the following state machine to deal with
221  * layering:
222  *
223  *                       need copyup
224  * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
225  *        |     ^                              |
226  *        v     \------------------------------/
227  *      done
228  *        ^
229  *        |
230  * RBD_OBJ_WRITE_FLAT
231  *
232  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
233  * there is a parent or not.
234  */
235 enum rbd_obj_write_state {
236         RBD_OBJ_WRITE_FLAT = 1,
237         RBD_OBJ_WRITE_GUARD,
238         RBD_OBJ_WRITE_COPYUP,
239 };
240
241 struct rbd_obj_request {
242         struct ceph_object_extent ex;
243         union {
244                 bool                    tried_parent;   /* for reads */
245                 enum rbd_obj_write_state write_state;   /* for writes */
246         };
247
248         struct rbd_img_request  *img_request;
249         struct ceph_file_extent *img_extents;
250         u32                     num_img_extents;
251
252         union {
253                 struct ceph_bio_iter    bio_pos;
254                 struct {
255                         struct ceph_bvec_iter   bvec_pos;
256                         u32                     bvec_count;
257                         u32                     bvec_idx;
258                 };
259         };
260         struct bio_vec          *copyup_bvecs;
261         u32                     copyup_bvec_count;
262
263         struct ceph_osd_request *osd_req;
264
265         u64                     xferred;        /* bytes transferred */
266         int                     result;
267
268         struct kref             kref;
269 };
270
271 enum img_req_flags {
272         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
273         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
274 };
275
276 struct rbd_img_request {
277         struct rbd_device       *rbd_dev;
278         enum obj_operation_type op_type;
279         enum obj_request_type   data_type;
280         unsigned long           flags;
281         union {
282                 u64                     snap_id;        /* for reads */
283                 struct ceph_snap_context *snapc;        /* for writes */
284         };
285         union {
286                 struct request          *rq;            /* block request */
287                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
288         };
289         spinlock_t              completion_lock;
290         u64                     xferred;/* aggregate bytes transferred */
291         int                     result; /* first nonzero obj_request result */
292
293         struct list_head        object_extents; /* obj_req.ex structs */
294         u32                     obj_request_count;
295         u32                     pending_count;
296
297         struct kref             kref;
298 };
299
300 #define for_each_obj_request(ireq, oreq) \
301         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
302 #define for_each_obj_request_safe(ireq, oreq, n) \
303         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
304
305 enum rbd_watch_state {
306         RBD_WATCH_STATE_UNREGISTERED,
307         RBD_WATCH_STATE_REGISTERED,
308         RBD_WATCH_STATE_ERROR,
309 };
310
311 enum rbd_lock_state {
312         RBD_LOCK_STATE_UNLOCKED,
313         RBD_LOCK_STATE_LOCKED,
314         RBD_LOCK_STATE_RELEASING,
315 };
316
317 /* WatchNotify::ClientId */
318 struct rbd_client_id {
319         u64 gid;
320         u64 handle;
321 };
322
323 struct rbd_mapping {
324         u64                     size;
325         u64                     features;
326 };
327
328 /*
329  * a single device
330  */
331 struct rbd_device {
332         int                     dev_id;         /* blkdev unique id */
333
334         int                     major;          /* blkdev assigned major */
335         int                     minor;
336         struct gendisk          *disk;          /* blkdev's gendisk and rq */
337
338         u32                     image_format;   /* Either 1 or 2 */
339         struct rbd_client       *rbd_client;
340
341         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
343         spinlock_t              lock;           /* queue, flags, open_count */
344
345         struct rbd_image_header header;
346         unsigned long           flags;          /* possibly lock protected */
347         struct rbd_spec         *spec;
348         struct rbd_options      *opts;
349         char                    *config_info;   /* add{,_single_major} string */
350
351         struct ceph_object_id   header_oid;
352         struct ceph_object_locator header_oloc;
353
354         struct ceph_file_layout layout;         /* used for all rbd requests */
355
356         struct mutex            watch_mutex;
357         enum rbd_watch_state    watch_state;
358         struct ceph_osd_linger_request *watch_handle;
359         u64                     watch_cookie;
360         struct delayed_work     watch_dwork;
361
362         struct rw_semaphore     lock_rwsem;
363         enum rbd_lock_state     lock_state;
364         char                    lock_cookie[32];
365         struct rbd_client_id    owner_cid;
366         struct work_struct      acquired_lock_work;
367         struct work_struct      released_lock_work;
368         struct delayed_work     lock_dwork;
369         struct work_struct      unlock_work;
370         wait_queue_head_t       lock_waitq;
371
372         struct workqueue_struct *task_wq;
373
374         struct rbd_spec         *parent_spec;
375         u64                     parent_overlap;
376         atomic_t                parent_ref;
377         struct rbd_device       *parent;
378
379         /* Block layer tags. */
380         struct blk_mq_tag_set   tag_set;
381
382         /* protects updating the header */
383         struct rw_semaphore     header_rwsem;
384
385         struct rbd_mapping      mapping;
386
387         struct list_head        node;
388
389         /* sysfs related */
390         struct device           dev;
391         unsigned long           open_count;     /* protected by lock */
392 };
393
394 /*
395  * Flag bits for rbd_dev->flags:
396  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397  *   by rbd_dev->lock
398  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
399  */
400 enum rbd_dev_flags {
401         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
402         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
403         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
404 };
405
406 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
407
408 static LIST_HEAD(rbd_dev_list);    /* devices */
409 static DEFINE_SPINLOCK(rbd_dev_list_lock);
410
411 static LIST_HEAD(rbd_client_list);              /* clients */
412 static DEFINE_SPINLOCK(rbd_client_list_lock);
413
414 /* Slab caches for frequently-allocated structures */
415
416 static struct kmem_cache        *rbd_img_request_cache;
417 static struct kmem_cache        *rbd_obj_request_cache;
418
419 static int rbd_major;
420 static DEFINE_IDA(rbd_dev_id_ida);
421
422 static struct workqueue_struct *rbd_wq;
423
424 /*
425  * single-major requires >= 0.75 version of userspace rbd utility.
426  */
427 static bool single_major = true;
428 module_param(single_major, bool, 0444);
429 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
430
431 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
432 static ssize_t remove_store(struct bus_type *bus, const char *buf,
433                             size_t count);
434 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
435                                       size_t count);
436 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
437                                          size_t count);
438 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
439
440 static int rbd_dev_id_to_minor(int dev_id)
441 {
442         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
443 }
444
445 static int minor_to_rbd_dev_id(int minor)
446 {
447         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
448 }
449
450 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
451 {
452         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
453                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
454 }
455
456 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
457 {
458         bool is_lock_owner;
459
460         down_read(&rbd_dev->lock_rwsem);
461         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
462         up_read(&rbd_dev->lock_rwsem);
463         return is_lock_owner;
464 }
465
466 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
467 {
468         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
469 }
470
471 static BUS_ATTR_WO(add);
472 static BUS_ATTR_WO(remove);
473 static BUS_ATTR_WO(add_single_major);
474 static BUS_ATTR_WO(remove_single_major);
475 static BUS_ATTR_RO(supported_features);
476
477 static struct attribute *rbd_bus_attrs[] = {
478         &bus_attr_add.attr,
479         &bus_attr_remove.attr,
480         &bus_attr_add_single_major.attr,
481         &bus_attr_remove_single_major.attr,
482         &bus_attr_supported_features.attr,
483         NULL,
484 };
485
486 static umode_t rbd_bus_is_visible(struct kobject *kobj,
487                                   struct attribute *attr, int index)
488 {
489         if (!single_major &&
490             (attr == &bus_attr_add_single_major.attr ||
491              attr == &bus_attr_remove_single_major.attr))
492                 return 0;
493
494         return attr->mode;
495 }
496
497 static const struct attribute_group rbd_bus_group = {
498         .attrs = rbd_bus_attrs,
499         .is_visible = rbd_bus_is_visible,
500 };
501 __ATTRIBUTE_GROUPS(rbd_bus);
502
503 static struct bus_type rbd_bus_type = {
504         .name           = "rbd",
505         .bus_groups     = rbd_bus_groups,
506 };
507
508 static void rbd_root_dev_release(struct device *dev)
509 {
510 }
511
512 static struct device rbd_root_dev = {
513         .init_name =    "rbd",
514         .release =      rbd_root_dev_release,
515 };
516
517 static __printf(2, 3)
518 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
519 {
520         struct va_format vaf;
521         va_list args;
522
523         va_start(args, fmt);
524         vaf.fmt = fmt;
525         vaf.va = &args;
526
527         if (!rbd_dev)
528                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
529         else if (rbd_dev->disk)
530                 printk(KERN_WARNING "%s: %s: %pV\n",
531                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
532         else if (rbd_dev->spec && rbd_dev->spec->image_name)
533                 printk(KERN_WARNING "%s: image %s: %pV\n",
534                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
535         else if (rbd_dev->spec && rbd_dev->spec->image_id)
536                 printk(KERN_WARNING "%s: id %s: %pV\n",
537                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
538         else    /* punt */
539                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
540                         RBD_DRV_NAME, rbd_dev, &vaf);
541         va_end(args);
542 }
543
544 #ifdef RBD_DEBUG
545 #define rbd_assert(expr)                                                \
546                 if (unlikely(!(expr))) {                                \
547                         printk(KERN_ERR "\nAssertion failure in %s() "  \
548                                                 "at line %d:\n\n"       \
549                                         "\trbd_assert(%s);\n\n",        \
550                                         __func__, __LINE__, #expr);     \
551                         BUG();                                          \
552                 }
553 #else /* !RBD_DEBUG */
554 #  define rbd_assert(expr)      ((void) 0)
555 #endif /* !RBD_DEBUG */
556
557 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
558
559 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
560 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
561 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
562 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
563 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
564                                         u64 snap_id);
565 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
566                                 u8 *order, u64 *snap_size);
567 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
568                 u64 *snap_features);
569
570 static int rbd_open(struct block_device *bdev, fmode_t mode)
571 {
572         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
573         bool removing = false;
574
575         spin_lock_irq(&rbd_dev->lock);
576         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
577                 removing = true;
578         else
579                 rbd_dev->open_count++;
580         spin_unlock_irq(&rbd_dev->lock);
581         if (removing)
582                 return -ENOENT;
583
584         (void) get_device(&rbd_dev->dev);
585
586         return 0;
587 }
588
589 static void rbd_release(struct gendisk *disk, fmode_t mode)
590 {
591         struct rbd_device *rbd_dev = disk->private_data;
592         unsigned long open_count_before;
593
594         spin_lock_irq(&rbd_dev->lock);
595         open_count_before = rbd_dev->open_count--;
596         spin_unlock_irq(&rbd_dev->lock);
597         rbd_assert(open_count_before > 0);
598
599         put_device(&rbd_dev->dev);
600 }
601
602 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
603 {
604         int ro;
605
606         if (get_user(ro, (int __user *)arg))
607                 return -EFAULT;
608
609         /* Snapshots can't be marked read-write */
610         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
611                 return -EROFS;
612
613         /* Let blkdev_roset() handle it */
614         return -ENOTTY;
615 }
616
617 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
618                         unsigned int cmd, unsigned long arg)
619 {
620         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
621         int ret;
622
623         switch (cmd) {
624         case BLKROSET:
625                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
626                 break;
627         default:
628                 ret = -ENOTTY;
629         }
630
631         return ret;
632 }
633
634 #ifdef CONFIG_COMPAT
635 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
636                                 unsigned int cmd, unsigned long arg)
637 {
638         return rbd_ioctl(bdev, mode, cmd, arg);
639 }
640 #endif /* CONFIG_COMPAT */
641
642 static const struct block_device_operations rbd_bd_ops = {
643         .owner                  = THIS_MODULE,
644         .open                   = rbd_open,
645         .release                = rbd_release,
646         .ioctl                  = rbd_ioctl,
647 #ifdef CONFIG_COMPAT
648         .compat_ioctl           = rbd_compat_ioctl,
649 #endif
650 };
651
652 /*
653  * Initialize an rbd client instance.  Success or not, this function
654  * consumes ceph_opts.  Caller holds client_mutex.
655  */
656 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
657 {
658         struct rbd_client *rbdc;
659         int ret = -ENOMEM;
660
661         dout("%s:\n", __func__);
662         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
663         if (!rbdc)
664                 goto out_opt;
665
666         kref_init(&rbdc->kref);
667         INIT_LIST_HEAD(&rbdc->node);
668
669         rbdc->client = ceph_create_client(ceph_opts, rbdc);
670         if (IS_ERR(rbdc->client))
671                 goto out_rbdc;
672         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
673
674         ret = ceph_open_session(rbdc->client);
675         if (ret < 0)
676                 goto out_client;
677
678         spin_lock(&rbd_client_list_lock);
679         list_add_tail(&rbdc->node, &rbd_client_list);
680         spin_unlock(&rbd_client_list_lock);
681
682         dout("%s: rbdc %p\n", __func__, rbdc);
683
684         return rbdc;
685 out_client:
686         ceph_destroy_client(rbdc->client);
687 out_rbdc:
688         kfree(rbdc);
689 out_opt:
690         if (ceph_opts)
691                 ceph_destroy_options(ceph_opts);
692         dout("%s: error %d\n", __func__, ret);
693
694         return ERR_PTR(ret);
695 }
696
697 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
698 {
699         kref_get(&rbdc->kref);
700
701         return rbdc;
702 }
703
704 /*
705  * Find a ceph client with specific addr and configuration.  If
706  * found, bump its reference count.
707  */
708 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
709 {
710         struct rbd_client *client_node;
711         bool found = false;
712
713         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
714                 return NULL;
715
716         spin_lock(&rbd_client_list_lock);
717         list_for_each_entry(client_node, &rbd_client_list, node) {
718                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
719                         __rbd_get_client(client_node);
720
721                         found = true;
722                         break;
723                 }
724         }
725         spin_unlock(&rbd_client_list_lock);
726
727         return found ? client_node : NULL;
728 }
729
730 /*
731  * (Per device) rbd map options
732  */
733 enum {
734         Opt_queue_depth,
735         Opt_lock_timeout,
736         Opt_last_int,
737         /* int args above */
738         Opt_pool_ns,
739         Opt_last_string,
740         /* string args above */
741         Opt_read_only,
742         Opt_read_write,
743         Opt_lock_on_read,
744         Opt_exclusive,
745         Opt_notrim,
746         Opt_err
747 };
748
749 static match_table_t rbd_opts_tokens = {
750         {Opt_queue_depth, "queue_depth=%d"},
751         {Opt_lock_timeout, "lock_timeout=%d"},
752         /* int args above */
753         {Opt_pool_ns, "_pool_ns=%s"},
754         /* string args above */
755         {Opt_read_only, "read_only"},
756         {Opt_read_only, "ro"},          /* Alternate spelling */
757         {Opt_read_write, "read_write"},
758         {Opt_read_write, "rw"},         /* Alternate spelling */
759         {Opt_lock_on_read, "lock_on_read"},
760         {Opt_exclusive, "exclusive"},
761         {Opt_notrim, "notrim"},
762         {Opt_err, NULL}
763 };
764
765 struct rbd_options {
766         int     queue_depth;
767         unsigned long   lock_timeout;
768         bool    read_only;
769         bool    lock_on_read;
770         bool    exclusive;
771         bool    trim;
772 };
773
774 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
775 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
776 #define RBD_READ_ONLY_DEFAULT   false
777 #define RBD_LOCK_ON_READ_DEFAULT false
778 #define RBD_EXCLUSIVE_DEFAULT   false
779 #define RBD_TRIM_DEFAULT        true
780
781 struct parse_rbd_opts_ctx {
782         struct rbd_spec         *spec;
783         struct rbd_options      *opts;
784 };
785
786 static int parse_rbd_opts_token(char *c, void *private)
787 {
788         struct parse_rbd_opts_ctx *pctx = private;
789         substring_t argstr[MAX_OPT_ARGS];
790         int token, intval, ret;
791
792         token = match_token(c, rbd_opts_tokens, argstr);
793         if (token < Opt_last_int) {
794                 ret = match_int(&argstr[0], &intval);
795                 if (ret < 0) {
796                         pr_err("bad option arg (not int) at '%s'\n", c);
797                         return ret;
798                 }
799                 dout("got int token %d val %d\n", token, intval);
800         } else if (token > Opt_last_int && token < Opt_last_string) {
801                 dout("got string token %d val %s\n", token, argstr[0].from);
802         } else {
803                 dout("got token %d\n", token);
804         }
805
806         switch (token) {
807         case Opt_queue_depth:
808                 if (intval < 1) {
809                         pr_err("queue_depth out of range\n");
810                         return -EINVAL;
811                 }
812                 pctx->opts->queue_depth = intval;
813                 break;
814         case Opt_lock_timeout:
815                 /* 0 is "wait forever" (i.e. infinite timeout) */
816                 if (intval < 0 || intval > INT_MAX / 1000) {
817                         pr_err("lock_timeout out of range\n");
818                         return -EINVAL;
819                 }
820                 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
821                 break;
822         case Opt_pool_ns:
823                 kfree(pctx->spec->pool_ns);
824                 pctx->spec->pool_ns = match_strdup(argstr);
825                 if (!pctx->spec->pool_ns)
826                         return -ENOMEM;
827                 break;
828         case Opt_read_only:
829                 pctx->opts->read_only = true;
830                 break;
831         case Opt_read_write:
832                 pctx->opts->read_only = false;
833                 break;
834         case Opt_lock_on_read:
835                 pctx->opts->lock_on_read = true;
836                 break;
837         case Opt_exclusive:
838                 pctx->opts->exclusive = true;
839                 break;
840         case Opt_notrim:
841                 pctx->opts->trim = false;
842                 break;
843         default:
844                 /* libceph prints "bad option" msg */
845                 return -EINVAL;
846         }
847
848         return 0;
849 }
850
851 static char* obj_op_name(enum obj_operation_type op_type)
852 {
853         switch (op_type) {
854         case OBJ_OP_READ:
855                 return "read";
856         case OBJ_OP_WRITE:
857                 return "write";
858         case OBJ_OP_DISCARD:
859                 return "discard";
860         default:
861                 return "???";
862         }
863 }
864
865 /*
866  * Destroy ceph client
867  *
868  * Caller must hold rbd_client_list_lock.
869  */
870 static void rbd_client_release(struct kref *kref)
871 {
872         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
873
874         dout("%s: rbdc %p\n", __func__, rbdc);
875         spin_lock(&rbd_client_list_lock);
876         list_del(&rbdc->node);
877         spin_unlock(&rbd_client_list_lock);
878
879         ceph_destroy_client(rbdc->client);
880         kfree(rbdc);
881 }
882
883 /*
884  * Drop reference to ceph client node. If it's not referenced anymore, release
885  * it.
886  */
887 static void rbd_put_client(struct rbd_client *rbdc)
888 {
889         if (rbdc)
890                 kref_put(&rbdc->kref, rbd_client_release);
891 }
892
893 static int wait_for_latest_osdmap(struct ceph_client *client)
894 {
895         u64 newest_epoch;
896         int ret;
897
898         ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
899         if (ret)
900                 return ret;
901
902         if (client->osdc.osdmap->epoch >= newest_epoch)
903                 return 0;
904
905         ceph_osdc_maybe_request_map(&client->osdc);
906         return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
907                                      client->options->mount_timeout);
908 }
909
910 /*
911  * Get a ceph client with specific addr and configuration, if one does
912  * not exist create it.  Either way, ceph_opts is consumed by this
913  * function.
914  */
915 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
916 {
917         struct rbd_client *rbdc;
918         int ret;
919
920         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
921         rbdc = rbd_client_find(ceph_opts);
922         if (rbdc) {
923                 ceph_destroy_options(ceph_opts);
924
925                 /*
926                  * Using an existing client.  Make sure ->pg_pools is up to
927                  * date before we look up the pool id in do_rbd_add().
928                  */
929                 ret = wait_for_latest_osdmap(rbdc->client);
930                 if (ret) {
931                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
932                         rbd_put_client(rbdc);
933                         rbdc = ERR_PTR(ret);
934                 }
935         } else {
936                 rbdc = rbd_client_create(ceph_opts);
937         }
938         mutex_unlock(&client_mutex);
939
940         return rbdc;
941 }
942
943 static bool rbd_image_format_valid(u32 image_format)
944 {
945         return image_format == 1 || image_format == 2;
946 }
947
948 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
949 {
950         size_t size;
951         u32 snap_count;
952
953         /* The header has to start with the magic rbd header text */
954         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
955                 return false;
956
957         /* The bio layer requires at least sector-sized I/O */
958
959         if (ondisk->options.order < SECTOR_SHIFT)
960                 return false;
961
962         /* If we use u64 in a few spots we may be able to loosen this */
963
964         if (ondisk->options.order > 8 * sizeof (int) - 1)
965                 return false;
966
967         /*
968          * The size of a snapshot header has to fit in a size_t, and
969          * that limits the number of snapshots.
970          */
971         snap_count = le32_to_cpu(ondisk->snap_count);
972         size = SIZE_MAX - sizeof (struct ceph_snap_context);
973         if (snap_count > size / sizeof (__le64))
974                 return false;
975
976         /*
977          * Not only that, but the size of the entire the snapshot
978          * header must also be representable in a size_t.
979          */
980         size -= snap_count * sizeof (__le64);
981         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
982                 return false;
983
984         return true;
985 }
986
987 /*
988  * returns the size of an object in the image
989  */
990 static u32 rbd_obj_bytes(struct rbd_image_header *header)
991 {
992         return 1U << header->obj_order;
993 }
994
995 static void rbd_init_layout(struct rbd_device *rbd_dev)
996 {
997         if (rbd_dev->header.stripe_unit == 0 ||
998             rbd_dev->header.stripe_count == 0) {
999                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1000                 rbd_dev->header.stripe_count = 1;
1001         }
1002
1003         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1004         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1005         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1006         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1007                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1008         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1009 }
1010
1011 /*
1012  * Fill an rbd image header with information from the given format 1
1013  * on-disk header.
1014  */
1015 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1016                                  struct rbd_image_header_ondisk *ondisk)
1017 {
1018         struct rbd_image_header *header = &rbd_dev->header;
1019         bool first_time = header->object_prefix == NULL;
1020         struct ceph_snap_context *snapc;
1021         char *object_prefix = NULL;
1022         char *snap_names = NULL;
1023         u64 *snap_sizes = NULL;
1024         u32 snap_count;
1025         int ret = -ENOMEM;
1026         u32 i;
1027
1028         /* Allocate this now to avoid having to handle failure below */
1029
1030         if (first_time) {
1031                 object_prefix = kstrndup(ondisk->object_prefix,
1032                                          sizeof(ondisk->object_prefix),
1033                                          GFP_KERNEL);
1034                 if (!object_prefix)
1035                         return -ENOMEM;
1036         }
1037
1038         /* Allocate the snapshot context and fill it in */
1039
1040         snap_count = le32_to_cpu(ondisk->snap_count);
1041         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1042         if (!snapc)
1043                 goto out_err;
1044         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1045         if (snap_count) {
1046                 struct rbd_image_snap_ondisk *snaps;
1047                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1048
1049                 /* We'll keep a copy of the snapshot names... */
1050
1051                 if (snap_names_len > (u64)SIZE_MAX)
1052                         goto out_2big;
1053                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1054                 if (!snap_names)
1055                         goto out_err;
1056
1057                 /* ...as well as the array of their sizes. */
1058                 snap_sizes = kmalloc_array(snap_count,
1059                                            sizeof(*header->snap_sizes),
1060                                            GFP_KERNEL);
1061                 if (!snap_sizes)
1062                         goto out_err;
1063
1064                 /*
1065                  * Copy the names, and fill in each snapshot's id
1066                  * and size.
1067                  *
1068                  * Note that rbd_dev_v1_header_info() guarantees the
1069                  * ondisk buffer we're working with has
1070                  * snap_names_len bytes beyond the end of the
1071                  * snapshot id array, this memcpy() is safe.
1072                  */
1073                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1074                 snaps = ondisk->snaps;
1075                 for (i = 0; i < snap_count; i++) {
1076                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1077                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1078                 }
1079         }
1080
1081         /* We won't fail any more, fill in the header */
1082
1083         if (first_time) {
1084                 header->object_prefix = object_prefix;
1085                 header->obj_order = ondisk->options.order;
1086                 rbd_init_layout(rbd_dev);
1087         } else {
1088                 ceph_put_snap_context(header->snapc);
1089                 kfree(header->snap_names);
1090                 kfree(header->snap_sizes);
1091         }
1092
1093         /* The remaining fields always get updated (when we refresh) */
1094
1095         header->image_size = le64_to_cpu(ondisk->image_size);
1096         header->snapc = snapc;
1097         header->snap_names = snap_names;
1098         header->snap_sizes = snap_sizes;
1099
1100         return 0;
1101 out_2big:
1102         ret = -EIO;
1103 out_err:
1104         kfree(snap_sizes);
1105         kfree(snap_names);
1106         ceph_put_snap_context(snapc);
1107         kfree(object_prefix);
1108
1109         return ret;
1110 }
1111
1112 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1113 {
1114         const char *snap_name;
1115
1116         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1117
1118         /* Skip over names until we find the one we are looking for */
1119
1120         snap_name = rbd_dev->header.snap_names;
1121         while (which--)
1122                 snap_name += strlen(snap_name) + 1;
1123
1124         return kstrdup(snap_name, GFP_KERNEL);
1125 }
1126
1127 /*
1128  * Snapshot id comparison function for use with qsort()/bsearch().
1129  * Note that result is for snapshots in *descending* order.
1130  */
1131 static int snapid_compare_reverse(const void *s1, const void *s2)
1132 {
1133         u64 snap_id1 = *(u64 *)s1;
1134         u64 snap_id2 = *(u64 *)s2;
1135
1136         if (snap_id1 < snap_id2)
1137                 return 1;
1138         return snap_id1 == snap_id2 ? 0 : -1;
1139 }
1140
1141 /*
1142  * Search a snapshot context to see if the given snapshot id is
1143  * present.
1144  *
1145  * Returns the position of the snapshot id in the array if it's found,
1146  * or BAD_SNAP_INDEX otherwise.
1147  *
1148  * Note: The snapshot array is in kept sorted (by the osd) in
1149  * reverse order, highest snapshot id first.
1150  */
1151 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1152 {
1153         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1154         u64 *found;
1155
1156         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1157                                 sizeof (snap_id), snapid_compare_reverse);
1158
1159         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1160 }
1161
1162 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1163                                         u64 snap_id)
1164 {
1165         u32 which;
1166         const char *snap_name;
1167
1168         which = rbd_dev_snap_index(rbd_dev, snap_id);
1169         if (which == BAD_SNAP_INDEX)
1170                 return ERR_PTR(-ENOENT);
1171
1172         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1173         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1174 }
1175
1176 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1177 {
1178         if (snap_id == CEPH_NOSNAP)
1179                 return RBD_SNAP_HEAD_NAME;
1180
1181         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1182         if (rbd_dev->image_format == 1)
1183                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1184
1185         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1186 }
1187
1188 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1189                                 u64 *snap_size)
1190 {
1191         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1192         if (snap_id == CEPH_NOSNAP) {
1193                 *snap_size = rbd_dev->header.image_size;
1194         } else if (rbd_dev->image_format == 1) {
1195                 u32 which;
1196
1197                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1198                 if (which == BAD_SNAP_INDEX)
1199                         return -ENOENT;
1200
1201                 *snap_size = rbd_dev->header.snap_sizes[which];
1202         } else {
1203                 u64 size = 0;
1204                 int ret;
1205
1206                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1207                 if (ret)
1208                         return ret;
1209
1210                 *snap_size = size;
1211         }
1212         return 0;
1213 }
1214
1215 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1216                         u64 *snap_features)
1217 {
1218         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1219         if (snap_id == CEPH_NOSNAP) {
1220                 *snap_features = rbd_dev->header.features;
1221         } else if (rbd_dev->image_format == 1) {
1222                 *snap_features = 0;     /* No features for format 1 */
1223         } else {
1224                 u64 features = 0;
1225                 int ret;
1226
1227                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1228                 if (ret)
1229                         return ret;
1230
1231                 *snap_features = features;
1232         }
1233         return 0;
1234 }
1235
1236 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1237 {
1238         u64 snap_id = rbd_dev->spec->snap_id;
1239         u64 size = 0;
1240         u64 features = 0;
1241         int ret;
1242
1243         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1244         if (ret)
1245                 return ret;
1246         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1247         if (ret)
1248                 return ret;
1249
1250         rbd_dev->mapping.size = size;
1251         rbd_dev->mapping.features = features;
1252
1253         return 0;
1254 }
1255
1256 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1257 {
1258         rbd_dev->mapping.size = 0;
1259         rbd_dev->mapping.features = 0;
1260 }
1261
1262 static void zero_bvec(struct bio_vec *bv)
1263 {
1264         void *buf;
1265         unsigned long flags;
1266
1267         buf = bvec_kmap_irq(bv, &flags);
1268         memset(buf, 0, bv->bv_len);
1269         flush_dcache_page(bv->bv_page);
1270         bvec_kunmap_irq(buf, &flags);
1271 }
1272
1273 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1274 {
1275         struct ceph_bio_iter it = *bio_pos;
1276
1277         ceph_bio_iter_advance(&it, off);
1278         ceph_bio_iter_advance_step(&it, bytes, ({
1279                 zero_bvec(&bv);
1280         }));
1281 }
1282
1283 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1284 {
1285         struct ceph_bvec_iter it = *bvec_pos;
1286
1287         ceph_bvec_iter_advance(&it, off);
1288         ceph_bvec_iter_advance_step(&it, bytes, ({
1289                 zero_bvec(&bv);
1290         }));
1291 }
1292
1293 /*
1294  * Zero a range in @obj_req data buffer defined by a bio (list) or
1295  * (private) bio_vec array.
1296  *
1297  * @off is relative to the start of the data buffer.
1298  */
1299 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1300                                u32 bytes)
1301 {
1302         switch (obj_req->img_request->data_type) {
1303         case OBJ_REQUEST_BIO:
1304                 zero_bios(&obj_req->bio_pos, off, bytes);
1305                 break;
1306         case OBJ_REQUEST_BVECS:
1307         case OBJ_REQUEST_OWN_BVECS:
1308                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1309                 break;
1310         default:
1311                 rbd_assert(0);
1312         }
1313 }
1314
1315 static void rbd_obj_request_destroy(struct kref *kref);
1316 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1317 {
1318         rbd_assert(obj_request != NULL);
1319         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1320                 kref_read(&obj_request->kref));
1321         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1322 }
1323
1324 static void rbd_img_request_get(struct rbd_img_request *img_request)
1325 {
1326         dout("%s: img %p (was %d)\n", __func__, img_request,
1327              kref_read(&img_request->kref));
1328         kref_get(&img_request->kref);
1329 }
1330
1331 static void rbd_img_request_destroy(struct kref *kref);
1332 static void rbd_img_request_put(struct rbd_img_request *img_request)
1333 {
1334         rbd_assert(img_request != NULL);
1335         dout("%s: img %p (was %d)\n", __func__, img_request,
1336                 kref_read(&img_request->kref));
1337         kref_put(&img_request->kref, rbd_img_request_destroy);
1338 }
1339
1340 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1341                                         struct rbd_obj_request *obj_request)
1342 {
1343         rbd_assert(obj_request->img_request == NULL);
1344
1345         /* Image request now owns object's original reference */
1346         obj_request->img_request = img_request;
1347         img_request->obj_request_count++;
1348         img_request->pending_count++;
1349         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1350 }
1351
1352 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1353                                         struct rbd_obj_request *obj_request)
1354 {
1355         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1356         list_del(&obj_request->ex.oe_item);
1357         rbd_assert(img_request->obj_request_count > 0);
1358         img_request->obj_request_count--;
1359         rbd_assert(obj_request->img_request == img_request);
1360         rbd_obj_request_put(obj_request);
1361 }
1362
1363 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1364 {
1365         struct ceph_osd_request *osd_req = obj_request->osd_req;
1366
1367         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1368              obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1369              obj_request->ex.oe_len, osd_req);
1370         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1371 }
1372
1373 /*
1374  * The default/initial value for all image request flags is 0.  Each
1375  * is conditionally set to 1 at image request initialization time
1376  * and currently never change thereafter.
1377  */
1378 static void img_request_layered_set(struct rbd_img_request *img_request)
1379 {
1380         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1381         smp_mb();
1382 }
1383
1384 static void img_request_layered_clear(struct rbd_img_request *img_request)
1385 {
1386         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1387         smp_mb();
1388 }
1389
1390 static bool img_request_layered_test(struct rbd_img_request *img_request)
1391 {
1392         smp_mb();
1393         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1394 }
1395
1396 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1397 {
1398         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1399
1400         return !obj_req->ex.oe_off &&
1401                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1402 }
1403
1404 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1405 {
1406         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1407
1408         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1409                                         rbd_dev->layout.object_size;
1410 }
1411
1412 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1413 {
1414         return ceph_file_extents_bytes(obj_req->img_extents,
1415                                        obj_req->num_img_extents);
1416 }
1417
1418 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1419 {
1420         switch (img_req->op_type) {
1421         case OBJ_OP_READ:
1422                 return false;
1423         case OBJ_OP_WRITE:
1424         case OBJ_OP_DISCARD:
1425                 return true;
1426         default:
1427                 BUG();
1428         }
1429 }
1430
1431 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1432
1433 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1434 {
1435         struct rbd_obj_request *obj_req = osd_req->r_priv;
1436
1437         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1438              osd_req->r_result, obj_req);
1439         rbd_assert(osd_req == obj_req->osd_req);
1440
1441         obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1442         if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1443                 obj_req->xferred = osd_req->r_result;
1444         else
1445                 /*
1446                  * Writes aren't allowed to return a data payload.  In some
1447                  * guarded write cases (e.g. stat + zero on an empty object)
1448                  * a stat response makes it through, but we don't care.
1449                  */
1450                 obj_req->xferred = 0;
1451
1452         rbd_obj_handle_request(obj_req);
1453 }
1454
1455 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1456 {
1457         struct ceph_osd_request *osd_req = obj_request->osd_req;
1458
1459         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1460         osd_req->r_snapid = obj_request->img_request->snap_id;
1461 }
1462
1463 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1464 {
1465         struct ceph_osd_request *osd_req = obj_request->osd_req;
1466
1467         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1468         ktime_get_real_ts64(&osd_req->r_mtime);
1469         osd_req->r_data_offset = obj_request->ex.oe_off;
1470 }
1471
1472 static struct ceph_osd_request *
1473 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1474 {
1475         struct rbd_img_request *img_req = obj_req->img_request;
1476         struct rbd_device *rbd_dev = img_req->rbd_dev;
1477         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1478         struct ceph_osd_request *req;
1479         const char *name_format = rbd_dev->image_format == 1 ?
1480                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1481
1482         req = ceph_osdc_alloc_request(osdc,
1483                         (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1484                         num_ops, false, GFP_NOIO);
1485         if (!req)
1486                 return NULL;
1487
1488         req->r_callback = rbd_osd_req_callback;
1489         req->r_priv = obj_req;
1490
1491         /*
1492          * Data objects may be stored in a separate pool, but always in
1493          * the same namespace in that pool as the header in its pool.
1494          */
1495         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1496         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1497
1498         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1499                         rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1500                 goto err_req;
1501
1502         return req;
1503
1504 err_req:
1505         ceph_osdc_put_request(req);
1506         return NULL;
1507 }
1508
1509 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1510 {
1511         ceph_osdc_put_request(osd_req);
1512 }
1513
1514 static struct rbd_obj_request *rbd_obj_request_create(void)
1515 {
1516         struct rbd_obj_request *obj_request;
1517
1518         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1519         if (!obj_request)
1520                 return NULL;
1521
1522         ceph_object_extent_init(&obj_request->ex);
1523         kref_init(&obj_request->kref);
1524
1525         dout("%s %p\n", __func__, obj_request);
1526         return obj_request;
1527 }
1528
1529 static void rbd_obj_request_destroy(struct kref *kref)
1530 {
1531         struct rbd_obj_request *obj_request;
1532         u32 i;
1533
1534         obj_request = container_of(kref, struct rbd_obj_request, kref);
1535
1536         dout("%s: obj %p\n", __func__, obj_request);
1537
1538         if (obj_request->osd_req)
1539                 rbd_osd_req_destroy(obj_request->osd_req);
1540
1541         switch (obj_request->img_request->data_type) {
1542         case OBJ_REQUEST_NODATA:
1543         case OBJ_REQUEST_BIO:
1544         case OBJ_REQUEST_BVECS:
1545                 break;          /* Nothing to do */
1546         case OBJ_REQUEST_OWN_BVECS:
1547                 kfree(obj_request->bvec_pos.bvecs);
1548                 break;
1549         default:
1550                 rbd_assert(0);
1551         }
1552
1553         kfree(obj_request->img_extents);
1554         if (obj_request->copyup_bvecs) {
1555                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1556                         if (obj_request->copyup_bvecs[i].bv_page)
1557                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1558                 }
1559                 kfree(obj_request->copyup_bvecs);
1560         }
1561
1562         kmem_cache_free(rbd_obj_request_cache, obj_request);
1563 }
1564
1565 /* It's OK to call this for a device with no parent */
1566
1567 static void rbd_spec_put(struct rbd_spec *spec);
1568 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1569 {
1570         rbd_dev_remove_parent(rbd_dev);
1571         rbd_spec_put(rbd_dev->parent_spec);
1572         rbd_dev->parent_spec = NULL;
1573         rbd_dev->parent_overlap = 0;
1574 }
1575
1576 /*
1577  * Parent image reference counting is used to determine when an
1578  * image's parent fields can be safely torn down--after there are no
1579  * more in-flight requests to the parent image.  When the last
1580  * reference is dropped, cleaning them up is safe.
1581  */
1582 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1583 {
1584         int counter;
1585
1586         if (!rbd_dev->parent_spec)
1587                 return;
1588
1589         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1590         if (counter > 0)
1591                 return;
1592
1593         /* Last reference; clean up parent data structures */
1594
1595         if (!counter)
1596                 rbd_dev_unparent(rbd_dev);
1597         else
1598                 rbd_warn(rbd_dev, "parent reference underflow");
1599 }
1600
1601 /*
1602  * If an image has a non-zero parent overlap, get a reference to its
1603  * parent.
1604  *
1605  * Returns true if the rbd device has a parent with a non-zero
1606  * overlap and a reference for it was successfully taken, or
1607  * false otherwise.
1608  */
1609 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1610 {
1611         int counter = 0;
1612
1613         if (!rbd_dev->parent_spec)
1614                 return false;
1615
1616         down_read(&rbd_dev->header_rwsem);
1617         if (rbd_dev->parent_overlap)
1618                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1619         up_read(&rbd_dev->header_rwsem);
1620
1621         if (counter < 0)
1622                 rbd_warn(rbd_dev, "parent reference overflow");
1623
1624         return counter > 0;
1625 }
1626
1627 /*
1628  * Caller is responsible for filling in the list of object requests
1629  * that comprises the image request, and the Linux request pointer
1630  * (if there is one).
1631  */
1632 static struct rbd_img_request *rbd_img_request_create(
1633                                         struct rbd_device *rbd_dev,
1634                                         enum obj_operation_type op_type,
1635                                         struct ceph_snap_context *snapc)
1636 {
1637         struct rbd_img_request *img_request;
1638
1639         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1640         if (!img_request)
1641                 return NULL;
1642
1643         img_request->rbd_dev = rbd_dev;
1644         img_request->op_type = op_type;
1645         if (!rbd_img_is_write(img_request))
1646                 img_request->snap_id = rbd_dev->spec->snap_id;
1647         else
1648                 img_request->snapc = snapc;
1649
1650         if (rbd_dev_parent_get(rbd_dev))
1651                 img_request_layered_set(img_request);
1652
1653         spin_lock_init(&img_request->completion_lock);
1654         INIT_LIST_HEAD(&img_request->object_extents);
1655         kref_init(&img_request->kref);
1656
1657         dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1658              obj_op_name(op_type), img_request);
1659         return img_request;
1660 }
1661
1662 static void rbd_img_request_destroy(struct kref *kref)
1663 {
1664         struct rbd_img_request *img_request;
1665         struct rbd_obj_request *obj_request;
1666         struct rbd_obj_request *next_obj_request;
1667
1668         img_request = container_of(kref, struct rbd_img_request, kref);
1669
1670         dout("%s: img %p\n", __func__, img_request);
1671
1672         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1673                 rbd_img_obj_request_del(img_request, obj_request);
1674         rbd_assert(img_request->obj_request_count == 0);
1675
1676         if (img_request_layered_test(img_request)) {
1677                 img_request_layered_clear(img_request);
1678                 rbd_dev_parent_put(img_request->rbd_dev);
1679         }
1680
1681         if (rbd_img_is_write(img_request))
1682                 ceph_put_snap_context(img_request->snapc);
1683
1684         kmem_cache_free(rbd_img_request_cache, img_request);
1685 }
1686
1687 static void prune_extents(struct ceph_file_extent *img_extents,
1688                           u32 *num_img_extents, u64 overlap)
1689 {
1690         u32 cnt = *num_img_extents;
1691
1692         /* drop extents completely beyond the overlap */
1693         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1694                 cnt--;
1695
1696         if (cnt) {
1697                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
1698
1699                 /* trim final overlapping extent */
1700                 if (ex->fe_off + ex->fe_len > overlap)
1701                         ex->fe_len = overlap - ex->fe_off;
1702         }
1703
1704         *num_img_extents = cnt;
1705 }
1706
1707 /*
1708  * Determine the byte range(s) covered by either just the object extent
1709  * or the entire object in the parent image.
1710  */
1711 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1712                                     bool entire)
1713 {
1714         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1715         int ret;
1716
1717         if (!rbd_dev->parent_overlap)
1718                 return 0;
1719
1720         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1721                                   entire ? 0 : obj_req->ex.oe_off,
1722                                   entire ? rbd_dev->layout.object_size :
1723                                                         obj_req->ex.oe_len,
1724                                   &obj_req->img_extents,
1725                                   &obj_req->num_img_extents);
1726         if (ret)
1727                 return ret;
1728
1729         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1730                       rbd_dev->parent_overlap);
1731         return 0;
1732 }
1733
1734 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1735 {
1736         switch (obj_req->img_request->data_type) {
1737         case OBJ_REQUEST_BIO:
1738                 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1739                                                &obj_req->bio_pos,
1740                                                obj_req->ex.oe_len);
1741                 break;
1742         case OBJ_REQUEST_BVECS:
1743         case OBJ_REQUEST_OWN_BVECS:
1744                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
1745                                                         obj_req->ex.oe_len);
1746                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1747                 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1748                                                     &obj_req->bvec_pos);
1749                 break;
1750         default:
1751                 rbd_assert(0);
1752         }
1753 }
1754
1755 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1756 {
1757         obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1758         if (!obj_req->osd_req)
1759                 return -ENOMEM;
1760
1761         osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1762                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1763         rbd_osd_req_setup_data(obj_req, 0);
1764
1765         rbd_osd_req_format_read(obj_req);
1766         return 0;
1767 }
1768
1769 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1770                                 unsigned int which)
1771 {
1772         struct page **pages;
1773
1774         /*
1775          * The response data for a STAT call consists of:
1776          *     le64 length;
1777          *     struct {
1778          *         le32 tv_sec;
1779          *         le32 tv_nsec;
1780          *     } mtime;
1781          */
1782         pages = ceph_alloc_page_vector(1, GFP_NOIO);
1783         if (IS_ERR(pages))
1784                 return PTR_ERR(pages);
1785
1786         osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1787         osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1788                                      8 + sizeof(struct ceph_timespec),
1789                                      0, false, true);
1790         return 0;
1791 }
1792
1793 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1794                                   unsigned int which)
1795 {
1796         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1797         u16 opcode;
1798
1799         osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1800                                    rbd_dev->layout.object_size,
1801                                    rbd_dev->layout.object_size);
1802
1803         if (rbd_obj_is_entire(obj_req))
1804                 opcode = CEPH_OSD_OP_WRITEFULL;
1805         else
1806                 opcode = CEPH_OSD_OP_WRITE;
1807
1808         osd_req_op_extent_init(obj_req->osd_req, which, opcode,
1809                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1810         rbd_osd_req_setup_data(obj_req, which++);
1811
1812         rbd_assert(which == obj_req->osd_req->r_num_ops);
1813         rbd_osd_req_format_write(obj_req);
1814 }
1815
1816 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1817 {
1818         unsigned int num_osd_ops, which = 0;
1819         int ret;
1820
1821         /* reverse map the entire object onto the parent */
1822         ret = rbd_obj_calc_img_extents(obj_req, true);
1823         if (ret)
1824                 return ret;
1825
1826         if (obj_req->num_img_extents) {
1827                 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1828                 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1829         } else {
1830                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1831                 num_osd_ops = 2; /* setallochint + write/writefull */
1832         }
1833
1834         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1835         if (!obj_req->osd_req)
1836                 return -ENOMEM;
1837
1838         if (obj_req->num_img_extents) {
1839                 ret = __rbd_obj_setup_stat(obj_req, which++);
1840                 if (ret)
1841                         return ret;
1842         }
1843
1844         __rbd_obj_setup_write(obj_req, which);
1845         return 0;
1846 }
1847
1848 static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1849                                     unsigned int which)
1850 {
1851         u16 opcode;
1852
1853         if (rbd_obj_is_entire(obj_req)) {
1854                 if (obj_req->num_img_extents) {
1855                         osd_req_op_init(obj_req->osd_req, which++,
1856                                         CEPH_OSD_OP_CREATE, 0);
1857                         opcode = CEPH_OSD_OP_TRUNCATE;
1858                 } else {
1859                         osd_req_op_init(obj_req->osd_req, which++,
1860                                         CEPH_OSD_OP_DELETE, 0);
1861                         opcode = 0;
1862                 }
1863         } else if (rbd_obj_is_tail(obj_req)) {
1864                 opcode = CEPH_OSD_OP_TRUNCATE;
1865         } else {
1866                 opcode = CEPH_OSD_OP_ZERO;
1867         }
1868
1869         if (opcode)
1870                 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
1871                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
1872                                        0, 0);
1873
1874         rbd_assert(which == obj_req->osd_req->r_num_ops);
1875         rbd_osd_req_format_write(obj_req);
1876 }
1877
1878 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1879 {
1880         unsigned int num_osd_ops, which = 0;
1881         int ret;
1882
1883         /* reverse map the entire object onto the parent */
1884         ret = rbd_obj_calc_img_extents(obj_req, true);
1885         if (ret)
1886                 return ret;
1887
1888         if (rbd_obj_is_entire(obj_req)) {
1889                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1890                 if (obj_req->num_img_extents)
1891                         num_osd_ops = 2; /* create + truncate */
1892                 else
1893                         num_osd_ops = 1; /* delete */
1894         } else {
1895                 if (obj_req->num_img_extents) {
1896                         obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1897                         num_osd_ops = 2; /* stat + truncate/zero */
1898                 } else {
1899                         obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1900                         num_osd_ops = 1; /* truncate/zero */
1901                 }
1902         }
1903
1904         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1905         if (!obj_req->osd_req)
1906                 return -ENOMEM;
1907
1908         if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1909                 ret = __rbd_obj_setup_stat(obj_req, which++);
1910                 if (ret)
1911                         return ret;
1912         }
1913
1914         __rbd_obj_setup_discard(obj_req, which);
1915         return 0;
1916 }
1917
1918 /*
1919  * For each object request in @img_req, allocate an OSD request, add
1920  * individual OSD ops and prepare them for submission.  The number of
1921  * OSD ops depends on op_type and the overlap point (if any).
1922  */
1923 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1924 {
1925         struct rbd_obj_request *obj_req;
1926         int ret;
1927
1928         for_each_obj_request(img_req, obj_req) {
1929                 switch (img_req->op_type) {
1930                 case OBJ_OP_READ:
1931                         ret = rbd_obj_setup_read(obj_req);
1932                         break;
1933                 case OBJ_OP_WRITE:
1934                         ret = rbd_obj_setup_write(obj_req);
1935                         break;
1936                 case OBJ_OP_DISCARD:
1937                         ret = rbd_obj_setup_discard(obj_req);
1938                         break;
1939                 default:
1940                         rbd_assert(0);
1941                 }
1942                 if (ret)
1943                         return ret;
1944
1945                 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
1946                 if (ret)
1947                         return ret;
1948         }
1949
1950         return 0;
1951 }
1952
1953 union rbd_img_fill_iter {
1954         struct ceph_bio_iter    bio_iter;
1955         struct ceph_bvec_iter   bvec_iter;
1956 };
1957
1958 struct rbd_img_fill_ctx {
1959         enum obj_request_type   pos_type;
1960         union rbd_img_fill_iter *pos;
1961         union rbd_img_fill_iter iter;
1962         ceph_object_extent_fn_t set_pos_fn;
1963         ceph_object_extent_fn_t count_fn;
1964         ceph_object_extent_fn_t copy_fn;
1965 };
1966
1967 static struct ceph_object_extent *alloc_object_extent(void *arg)
1968 {
1969         struct rbd_img_request *img_req = arg;
1970         struct rbd_obj_request *obj_req;
1971
1972         obj_req = rbd_obj_request_create();
1973         if (!obj_req)
1974                 return NULL;
1975
1976         rbd_img_obj_request_add(img_req, obj_req);
1977         return &obj_req->ex;
1978 }
1979
1980 /*
1981  * While su != os && sc == 1 is technically not fancy (it's the same
1982  * layout as su == os && sc == 1), we can't use the nocopy path for it
1983  * because ->set_pos_fn() should be called only once per object.
1984  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1985  * treat su != os && sc == 1 as fancy.
1986  */
1987 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1988 {
1989         return l->stripe_unit != l->object_size;
1990 }
1991
1992 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1993                                        struct ceph_file_extent *img_extents,
1994                                        u32 num_img_extents,
1995                                        struct rbd_img_fill_ctx *fctx)
1996 {
1997         u32 i;
1998         int ret;
1999
2000         img_req->data_type = fctx->pos_type;
2001
2002         /*
2003          * Create object requests and set each object request's starting
2004          * position in the provided bio (list) or bio_vec array.
2005          */
2006         fctx->iter = *fctx->pos;
2007         for (i = 0; i < num_img_extents; i++) {
2008                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2009                                            img_extents[i].fe_off,
2010                                            img_extents[i].fe_len,
2011                                            &img_req->object_extents,
2012                                            alloc_object_extent, img_req,
2013                                            fctx->set_pos_fn, &fctx->iter);
2014                 if (ret)
2015                         return ret;
2016         }
2017
2018         return __rbd_img_fill_request(img_req);
2019 }
2020
2021 /*
2022  * Map a list of image extents to a list of object extents, create the
2023  * corresponding object requests (normally each to a different object,
2024  * but not always) and add them to @img_req.  For each object request,
2025  * set up its data descriptor to point to the corresponding chunk(s) of
2026  * @fctx->pos data buffer.
2027  *
2028  * Because ceph_file_to_extents() will merge adjacent object extents
2029  * together, each object request's data descriptor may point to multiple
2030  * different chunks of @fctx->pos data buffer.
2031  *
2032  * @fctx->pos data buffer is assumed to be large enough.
2033  */
2034 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2035                                 struct ceph_file_extent *img_extents,
2036                                 u32 num_img_extents,
2037                                 struct rbd_img_fill_ctx *fctx)
2038 {
2039         struct rbd_device *rbd_dev = img_req->rbd_dev;
2040         struct rbd_obj_request *obj_req;
2041         u32 i;
2042         int ret;
2043
2044         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2045             !rbd_layout_is_fancy(&rbd_dev->layout))
2046                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2047                                                    num_img_extents, fctx);
2048
2049         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2050
2051         /*
2052          * Create object requests and determine ->bvec_count for each object
2053          * request.  Note that ->bvec_count sum over all object requests may
2054          * be greater than the number of bio_vecs in the provided bio (list)
2055          * or bio_vec array because when mapped, those bio_vecs can straddle
2056          * stripe unit boundaries.
2057          */
2058         fctx->iter = *fctx->pos;
2059         for (i = 0; i < num_img_extents; i++) {
2060                 ret = ceph_file_to_extents(&rbd_dev->layout,
2061                                            img_extents[i].fe_off,
2062                                            img_extents[i].fe_len,
2063                                            &img_req->object_extents,
2064                                            alloc_object_extent, img_req,
2065                                            fctx->count_fn, &fctx->iter);
2066                 if (ret)
2067                         return ret;
2068         }
2069
2070         for_each_obj_request(img_req, obj_req) {
2071                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2072                                               sizeof(*obj_req->bvec_pos.bvecs),
2073                                               GFP_NOIO);
2074                 if (!obj_req->bvec_pos.bvecs)
2075                         return -ENOMEM;
2076         }
2077
2078         /*
2079          * Fill in each object request's private bio_vec array, splitting and
2080          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2081          */
2082         fctx->iter = *fctx->pos;
2083         for (i = 0; i < num_img_extents; i++) {
2084                 ret = ceph_iterate_extents(&rbd_dev->layout,
2085                                            img_extents[i].fe_off,
2086                                            img_extents[i].fe_len,
2087                                            &img_req->object_extents,
2088                                            fctx->copy_fn, &fctx->iter);
2089                 if (ret)
2090                         return ret;
2091         }
2092
2093         return __rbd_img_fill_request(img_req);
2094 }
2095
2096 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2097                                u64 off, u64 len)
2098 {
2099         struct ceph_file_extent ex = { off, len };
2100         union rbd_img_fill_iter dummy;
2101         struct rbd_img_fill_ctx fctx = {
2102                 .pos_type = OBJ_REQUEST_NODATA,
2103                 .pos = &dummy,
2104         };
2105
2106         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2107 }
2108
2109 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2110 {
2111         struct rbd_obj_request *obj_req =
2112             container_of(ex, struct rbd_obj_request, ex);
2113         struct ceph_bio_iter *it = arg;
2114
2115         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2116         obj_req->bio_pos = *it;
2117         ceph_bio_iter_advance(it, bytes);
2118 }
2119
2120 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2121 {
2122         struct rbd_obj_request *obj_req =
2123             container_of(ex, struct rbd_obj_request, ex);
2124         struct ceph_bio_iter *it = arg;
2125
2126         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2127         ceph_bio_iter_advance_step(it, bytes, ({
2128                 obj_req->bvec_count++;
2129         }));
2130
2131 }
2132
2133 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2134 {
2135         struct rbd_obj_request *obj_req =
2136             container_of(ex, struct rbd_obj_request, ex);
2137         struct ceph_bio_iter *it = arg;
2138
2139         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2140         ceph_bio_iter_advance_step(it, bytes, ({
2141                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2142                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2143         }));
2144 }
2145
2146 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2147                                    struct ceph_file_extent *img_extents,
2148                                    u32 num_img_extents,
2149                                    struct ceph_bio_iter *bio_pos)
2150 {
2151         struct rbd_img_fill_ctx fctx = {
2152                 .pos_type = OBJ_REQUEST_BIO,
2153                 .pos = (union rbd_img_fill_iter *)bio_pos,
2154                 .set_pos_fn = set_bio_pos,
2155                 .count_fn = count_bio_bvecs,
2156                 .copy_fn = copy_bio_bvecs,
2157         };
2158
2159         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2160                                     &fctx);
2161 }
2162
2163 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2164                                  u64 off, u64 len, struct bio *bio)
2165 {
2166         struct ceph_file_extent ex = { off, len };
2167         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2168
2169         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2170 }
2171
2172 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2173 {
2174         struct rbd_obj_request *obj_req =
2175             container_of(ex, struct rbd_obj_request, ex);
2176         struct ceph_bvec_iter *it = arg;
2177
2178         obj_req->bvec_pos = *it;
2179         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2180         ceph_bvec_iter_advance(it, bytes);
2181 }
2182
2183 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2184 {
2185         struct rbd_obj_request *obj_req =
2186             container_of(ex, struct rbd_obj_request, ex);
2187         struct ceph_bvec_iter *it = arg;
2188
2189         ceph_bvec_iter_advance_step(it, bytes, ({
2190                 obj_req->bvec_count++;
2191         }));
2192 }
2193
2194 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2195 {
2196         struct rbd_obj_request *obj_req =
2197             container_of(ex, struct rbd_obj_request, ex);
2198         struct ceph_bvec_iter *it = arg;
2199
2200         ceph_bvec_iter_advance_step(it, bytes, ({
2201                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2202                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2203         }));
2204 }
2205
2206 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2207                                      struct ceph_file_extent *img_extents,
2208                                      u32 num_img_extents,
2209                                      struct ceph_bvec_iter *bvec_pos)
2210 {
2211         struct rbd_img_fill_ctx fctx = {
2212                 .pos_type = OBJ_REQUEST_BVECS,
2213                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2214                 .set_pos_fn = set_bvec_pos,
2215                 .count_fn = count_bvecs,
2216                 .copy_fn = copy_bvecs,
2217         };
2218
2219         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2220                                     &fctx);
2221 }
2222
2223 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2224                                    struct ceph_file_extent *img_extents,
2225                                    u32 num_img_extents,
2226                                    struct bio_vec *bvecs)
2227 {
2228         struct ceph_bvec_iter it = {
2229                 .bvecs = bvecs,
2230                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2231                                                              num_img_extents) },
2232         };
2233
2234         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2235                                          &it);
2236 }
2237
2238 static void rbd_img_request_submit(struct rbd_img_request *img_request)
2239 {
2240         struct rbd_obj_request *obj_request;
2241
2242         dout("%s: img %p\n", __func__, img_request);
2243
2244         rbd_img_request_get(img_request);
2245         for_each_obj_request(img_request, obj_request)
2246                 rbd_obj_request_submit(obj_request);
2247
2248         rbd_img_request_put(img_request);
2249 }
2250
2251 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2252 {
2253         struct rbd_img_request *img_req = obj_req->img_request;
2254         struct rbd_img_request *child_img_req;
2255         int ret;
2256
2257         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2258                                                OBJ_OP_READ, NULL);
2259         if (!child_img_req)
2260                 return -ENOMEM;
2261
2262         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2263         child_img_req->obj_request = obj_req;
2264
2265         if (!rbd_img_is_write(img_req)) {
2266                 switch (img_req->data_type) {
2267                 case OBJ_REQUEST_BIO:
2268                         ret = __rbd_img_fill_from_bio(child_img_req,
2269                                                       obj_req->img_extents,
2270                                                       obj_req->num_img_extents,
2271                                                       &obj_req->bio_pos);
2272                         break;
2273                 case OBJ_REQUEST_BVECS:
2274                 case OBJ_REQUEST_OWN_BVECS:
2275                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2276                                                       obj_req->img_extents,
2277                                                       obj_req->num_img_extents,
2278                                                       &obj_req->bvec_pos);
2279                         break;
2280                 default:
2281                         rbd_assert(0);
2282                 }
2283         } else {
2284                 ret = rbd_img_fill_from_bvecs(child_img_req,
2285                                               obj_req->img_extents,
2286                                               obj_req->num_img_extents,
2287                                               obj_req->copyup_bvecs);
2288         }
2289         if (ret) {
2290                 rbd_img_request_put(child_img_req);
2291                 return ret;
2292         }
2293
2294         rbd_img_request_submit(child_img_req);
2295         return 0;
2296 }
2297
2298 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2299 {
2300         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2301         int ret;
2302
2303         if (obj_req->result == -ENOENT &&
2304             rbd_dev->parent_overlap && !obj_req->tried_parent) {
2305                 /* reverse map this object extent onto the parent */
2306                 ret = rbd_obj_calc_img_extents(obj_req, false);
2307                 if (ret) {
2308                         obj_req->result = ret;
2309                         return true;
2310                 }
2311
2312                 if (obj_req->num_img_extents) {
2313                         obj_req->tried_parent = true;
2314                         ret = rbd_obj_read_from_parent(obj_req);
2315                         if (ret) {
2316                                 obj_req->result = ret;
2317                                 return true;
2318                         }
2319                         return false;
2320                 }
2321         }
2322
2323         /*
2324          * -ENOENT means a hole in the image -- zero-fill the entire
2325          * length of the request.  A short read also implies zero-fill
2326          * to the end of the request.  In both cases we update xferred
2327          * count to indicate the whole request was satisfied.
2328          */
2329         if (obj_req->result == -ENOENT ||
2330             (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2331                 rbd_assert(!obj_req->xferred || !obj_req->result);
2332                 rbd_obj_zero_range(obj_req, obj_req->xferred,
2333                                    obj_req->ex.oe_len - obj_req->xferred);
2334                 obj_req->result = 0;
2335                 obj_req->xferred = obj_req->ex.oe_len;
2336         }
2337
2338         return true;
2339 }
2340
2341 /*
2342  * copyup_bvecs pages are never highmem pages
2343  */
2344 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2345 {
2346         struct ceph_bvec_iter it = {
2347                 .bvecs = bvecs,
2348                 .iter = { .bi_size = bytes },
2349         };
2350
2351         ceph_bvec_iter_advance_step(&it, bytes, ({
2352                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2353                                bv.bv_len))
2354                         return false;
2355         }));
2356         return true;
2357 }
2358
2359 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2360 {
2361         unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2362         int ret;
2363
2364         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2365         rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2366         rbd_osd_req_destroy(obj_req->osd_req);
2367
2368         /*
2369          * Create a copyup request with the same number of OSD ops as
2370          * the original request.  The original request was stat + op(s),
2371          * the new copyup request will be copyup + the same op(s).
2372          */
2373         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2374         if (!obj_req->osd_req)
2375                 return -ENOMEM;
2376
2377         ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
2378         if (ret)
2379                 return ret;
2380
2381         /*
2382          * Only send non-zero copyup data to save some I/O and network
2383          * bandwidth -- zero copyup data is equivalent to the object not
2384          * existing.
2385          */
2386         if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2387                 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2388                 bytes = 0;
2389         }
2390         osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2391                                           obj_req->copyup_bvecs,
2392                                           obj_req->copyup_bvec_count,
2393                                           bytes);
2394
2395         switch (obj_req->img_request->op_type) {
2396         case OBJ_OP_WRITE:
2397                 __rbd_obj_setup_write(obj_req, 1);
2398                 break;
2399         case OBJ_OP_DISCARD:
2400                 rbd_assert(!rbd_obj_is_entire(obj_req));
2401                 __rbd_obj_setup_discard(obj_req, 1);
2402                 break;
2403         default:
2404                 rbd_assert(0);
2405         }
2406
2407         ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
2408         if (ret)
2409                 return ret;
2410
2411         rbd_obj_request_submit(obj_req);
2412         return 0;
2413 }
2414
2415 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2416 {
2417         u32 i;
2418
2419         rbd_assert(!obj_req->copyup_bvecs);
2420         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2421         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2422                                         sizeof(*obj_req->copyup_bvecs),
2423                                         GFP_NOIO);
2424         if (!obj_req->copyup_bvecs)
2425                 return -ENOMEM;
2426
2427         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2428                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2429
2430                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2431                 if (!obj_req->copyup_bvecs[i].bv_page)
2432                         return -ENOMEM;
2433
2434                 obj_req->copyup_bvecs[i].bv_offset = 0;
2435                 obj_req->copyup_bvecs[i].bv_len = len;
2436                 obj_overlap -= len;
2437         }
2438
2439         rbd_assert(!obj_overlap);
2440         return 0;
2441 }
2442
2443 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2444 {
2445         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2446         int ret;
2447
2448         rbd_assert(obj_req->num_img_extents);
2449         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2450                       rbd_dev->parent_overlap);
2451         if (!obj_req->num_img_extents) {
2452                 /*
2453                  * The overlap has become 0 (most likely because the
2454                  * image has been flattened).  Use rbd_obj_issue_copyup()
2455                  * to re-submit the original write request -- the copyup
2456                  * operation itself will be a no-op, since someone must
2457                  * have populated the child object while we weren't
2458                  * looking.  Move to WRITE_FLAT state as we'll be done
2459                  * with the operation once the null copyup completes.
2460                  */
2461                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2462                 return rbd_obj_issue_copyup(obj_req, 0);
2463         }
2464
2465         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
2466         if (ret)
2467                 return ret;
2468
2469         obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2470         return rbd_obj_read_from_parent(obj_req);
2471 }
2472
2473 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2474 {
2475         int ret;
2476
2477 again:
2478         switch (obj_req->write_state) {
2479         case RBD_OBJ_WRITE_GUARD:
2480                 rbd_assert(!obj_req->xferred);
2481                 if (obj_req->result == -ENOENT) {
2482                         /*
2483                          * The target object doesn't exist.  Read the data for
2484                          * the entire target object up to the overlap point (if
2485                          * any) from the parent, so we can use it for a copyup.
2486                          */
2487                         ret = rbd_obj_handle_write_guard(obj_req);
2488                         if (ret) {
2489                                 obj_req->result = ret;
2490                                 return true;
2491                         }
2492                         return false;
2493                 }
2494                 /* fall through */
2495         case RBD_OBJ_WRITE_FLAT:
2496                 if (!obj_req->result)
2497                         /*
2498                          * There is no such thing as a successful short
2499                          * write -- indicate the whole request was satisfied.
2500                          */
2501                         obj_req->xferred = obj_req->ex.oe_len;
2502                 return true;
2503         case RBD_OBJ_WRITE_COPYUP:
2504                 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2505                 if (obj_req->result)
2506                         goto again;
2507
2508                 rbd_assert(obj_req->xferred);
2509                 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2510                 if (ret) {
2511                         obj_req->result = ret;
2512                         return true;
2513                 }
2514                 return false;
2515         default:
2516                 BUG();
2517         }
2518 }
2519
2520 /*
2521  * Returns true if @obj_req is completed, or false otherwise.
2522  */
2523 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2524 {
2525         switch (obj_req->img_request->op_type) {
2526         case OBJ_OP_READ:
2527                 return rbd_obj_handle_read(obj_req);
2528         case OBJ_OP_WRITE:
2529                 return rbd_obj_handle_write(obj_req);
2530         case OBJ_OP_DISCARD:
2531                 if (rbd_obj_handle_write(obj_req)) {
2532                         /*
2533                          * Hide -ENOENT from delete/truncate/zero -- discarding
2534                          * a non-existent object is not a problem.
2535                          */
2536                         if (obj_req->result == -ENOENT) {
2537                                 obj_req->result = 0;
2538                                 obj_req->xferred = obj_req->ex.oe_len;
2539                         }
2540                         return true;
2541                 }
2542                 return false;
2543         default:
2544                 BUG();
2545         }
2546 }
2547
2548 static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2549 {
2550         struct rbd_img_request *img_req = obj_req->img_request;
2551
2552         rbd_assert((!obj_req->result &&
2553                     obj_req->xferred == obj_req->ex.oe_len) ||
2554                    (obj_req->result < 0 && !obj_req->xferred));
2555         if (!obj_req->result) {
2556                 img_req->xferred += obj_req->xferred;
2557                 return;
2558         }
2559
2560         rbd_warn(img_req->rbd_dev,
2561                  "%s at objno %llu %llu~%llu result %d xferred %llu",
2562                  obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2563                  obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2564                  obj_req->xferred);
2565         if (!img_req->result) {
2566                 img_req->result = obj_req->result;
2567                 img_req->xferred = 0;
2568         }
2569 }
2570
2571 static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2572 {
2573         struct rbd_obj_request *obj_req = img_req->obj_request;
2574
2575         rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2576         rbd_assert((!img_req->result &&
2577                     img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2578                    (img_req->result < 0 && !img_req->xferred));
2579
2580         obj_req->result = img_req->result;
2581         obj_req->xferred = img_req->xferred;
2582         rbd_img_request_put(img_req);
2583 }
2584
2585 static void rbd_img_end_request(struct rbd_img_request *img_req)
2586 {
2587         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2588         rbd_assert((!img_req->result &&
2589                     img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2590                    (img_req->result < 0 && !img_req->xferred));
2591
2592         blk_mq_end_request(img_req->rq,
2593                            errno_to_blk_status(img_req->result));
2594         rbd_img_request_put(img_req);
2595 }
2596
2597 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2598 {
2599         struct rbd_img_request *img_req;
2600
2601 again:
2602         if (!__rbd_obj_handle_request(obj_req))
2603                 return;
2604
2605         img_req = obj_req->img_request;
2606         spin_lock(&img_req->completion_lock);
2607         rbd_obj_end_request(obj_req);
2608         rbd_assert(img_req->pending_count);
2609         if (--img_req->pending_count) {
2610                 spin_unlock(&img_req->completion_lock);
2611                 return;
2612         }
2613
2614         spin_unlock(&img_req->completion_lock);
2615         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2616                 obj_req = img_req->obj_request;
2617                 rbd_img_end_child_request(img_req);
2618                 goto again;
2619         }
2620         rbd_img_end_request(img_req);
2621 }
2622
2623 static const struct rbd_client_id rbd_empty_cid;
2624
2625 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2626                           const struct rbd_client_id *rhs)
2627 {
2628         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2629 }
2630
2631 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2632 {
2633         struct rbd_client_id cid;
2634
2635         mutex_lock(&rbd_dev->watch_mutex);
2636         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2637         cid.handle = rbd_dev->watch_cookie;
2638         mutex_unlock(&rbd_dev->watch_mutex);
2639         return cid;
2640 }
2641
2642 /*
2643  * lock_rwsem must be held for write
2644  */
2645 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2646                               const struct rbd_client_id *cid)
2647 {
2648         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2649              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2650              cid->gid, cid->handle);
2651         rbd_dev->owner_cid = *cid; /* struct */
2652 }
2653
2654 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2655 {
2656         mutex_lock(&rbd_dev->watch_mutex);
2657         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2658         mutex_unlock(&rbd_dev->watch_mutex);
2659 }
2660
2661 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2662 {
2663         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2664
2665         strcpy(rbd_dev->lock_cookie, cookie);
2666         rbd_set_owner_cid(rbd_dev, &cid);
2667         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2668 }
2669
2670 /*
2671  * lock_rwsem must be held for write
2672  */
2673 static int rbd_lock(struct rbd_device *rbd_dev)
2674 {
2675         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2676         char cookie[32];
2677         int ret;
2678
2679         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2680                 rbd_dev->lock_cookie[0] != '\0');
2681
2682         format_lock_cookie(rbd_dev, cookie);
2683         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2684                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2685                             RBD_LOCK_TAG, "", 0);
2686         if (ret)
2687                 return ret;
2688
2689         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
2690         __rbd_lock(rbd_dev, cookie);
2691         return 0;
2692 }
2693
2694 /*
2695  * lock_rwsem must be held for write
2696  */
2697 static void rbd_unlock(struct rbd_device *rbd_dev)
2698 {
2699         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2700         int ret;
2701
2702         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2703                 rbd_dev->lock_cookie[0] == '\0');
2704
2705         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2706                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
2707         if (ret && ret != -ENOENT)
2708                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
2709
2710         /* treat errors as the image is unlocked */
2711         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
2712         rbd_dev->lock_cookie[0] = '\0';
2713         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2714         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
2715 }
2716
2717 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2718                                 enum rbd_notify_op notify_op,
2719                                 struct page ***preply_pages,
2720                                 size_t *preply_len)
2721 {
2722         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2723         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2724         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2725         int buf_size = sizeof(buf);
2726         void *p = buf;
2727
2728         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
2729
2730         /* encode *LockPayload NotifyMessage (op + ClientId) */
2731         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2732         ceph_encode_32(&p, notify_op);
2733         ceph_encode_64(&p, cid.gid);
2734         ceph_encode_64(&p, cid.handle);
2735
2736         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2737                                 &rbd_dev->header_oloc, buf, buf_size,
2738                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
2739 }
2740
2741 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2742                                enum rbd_notify_op notify_op)
2743 {
2744         struct page **reply_pages;
2745         size_t reply_len;
2746
2747         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2748         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2749 }
2750
2751 static void rbd_notify_acquired_lock(struct work_struct *work)
2752 {
2753         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2754                                                   acquired_lock_work);
2755
2756         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
2757 }
2758
2759 static void rbd_notify_released_lock(struct work_struct *work)
2760 {
2761         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2762                                                   released_lock_work);
2763
2764         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
2765 }
2766
2767 static int rbd_request_lock(struct rbd_device *rbd_dev)
2768 {
2769         struct page **reply_pages;
2770         size_t reply_len;
2771         bool lock_owner_responded = false;
2772         int ret;
2773
2774         dout("%s rbd_dev %p\n", __func__, rbd_dev);
2775
2776         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2777                                    &reply_pages, &reply_len);
2778         if (ret && ret != -ETIMEDOUT) {
2779                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
2780                 goto out;
2781         }
2782
2783         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2784                 void *p = page_address(reply_pages[0]);
2785                 void *const end = p + reply_len;
2786                 u32 n;
2787
2788                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2789                 while (n--) {
2790                         u8 struct_v;
2791                         u32 len;
2792
2793                         ceph_decode_need(&p, end, 8 + 8, e_inval);
2794                         p += 8 + 8; /* skip gid and cookie */
2795
2796                         ceph_decode_32_safe(&p, end, len, e_inval);
2797                         if (!len)
2798                                 continue;
2799
2800                         if (lock_owner_responded) {
2801                                 rbd_warn(rbd_dev,
2802                                          "duplicate lock owners detected");
2803                                 ret = -EIO;
2804                                 goto out;
2805                         }
2806
2807                         lock_owner_responded = true;
2808                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2809                                                   &struct_v, &len);
2810                         if (ret) {
2811                                 rbd_warn(rbd_dev,
2812                                          "failed to decode ResponseMessage: %d",
2813                                          ret);
2814                                 goto e_inval;
2815                         }
2816
2817                         ret = ceph_decode_32(&p);
2818                 }
2819         }
2820
2821         if (!lock_owner_responded) {
2822                 rbd_warn(rbd_dev, "no lock owners detected");
2823                 ret = -ETIMEDOUT;
2824         }
2825
2826 out:
2827         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2828         return ret;
2829
2830 e_inval:
2831         ret = -EINVAL;
2832         goto out;
2833 }
2834
2835 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2836 {
2837         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2838
2839         cancel_delayed_work(&rbd_dev->lock_dwork);
2840         if (wake_all)
2841                 wake_up_all(&rbd_dev->lock_waitq);
2842         else
2843                 wake_up(&rbd_dev->lock_waitq);
2844 }
2845
2846 static int get_lock_owner_info(struct rbd_device *rbd_dev,
2847                                struct ceph_locker **lockers, u32 *num_lockers)
2848 {
2849         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2850         u8 lock_type;
2851         char *lock_tag;
2852         int ret;
2853
2854         dout("%s rbd_dev %p\n", __func__, rbd_dev);
2855
2856         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2857                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
2858                                  &lock_type, &lock_tag, lockers, num_lockers);
2859         if (ret)
2860                 return ret;
2861
2862         if (*num_lockers == 0) {
2863                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2864                 goto out;
2865         }
2866
2867         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2868                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2869                          lock_tag);
2870                 ret = -EBUSY;
2871                 goto out;
2872         }
2873
2874         if (lock_type == CEPH_CLS_LOCK_SHARED) {
2875                 rbd_warn(rbd_dev, "shared lock type detected");
2876                 ret = -EBUSY;
2877                 goto out;
2878         }
2879
2880         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2881                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
2882                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2883                          (*lockers)[0].id.cookie);
2884                 ret = -EBUSY;
2885                 goto out;
2886         }
2887
2888 out:
2889         kfree(lock_tag);
2890         return ret;
2891 }
2892
2893 static int find_watcher(struct rbd_device *rbd_dev,
2894                         const struct ceph_locker *locker)
2895 {
2896         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2897         struct ceph_watch_item *watchers;
2898         u32 num_watchers;
2899         u64 cookie;
2900         int i;
2901         int ret;
2902
2903         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2904                                       &rbd_dev->header_oloc, &watchers,
2905                                       &num_watchers);
2906         if (ret)
2907                 return ret;
2908
2909         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2910         for (i = 0; i < num_watchers; i++) {
2911                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2912                             sizeof(locker->info.addr)) &&
2913                     watchers[i].cookie == cookie) {
2914                         struct rbd_client_id cid = {
2915                                 .gid = le64_to_cpu(watchers[i].name.num),
2916                                 .handle = cookie,
2917                         };
2918
2919                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2920                              rbd_dev, cid.gid, cid.handle);
2921                         rbd_set_owner_cid(rbd_dev, &cid);
2922                         ret = 1;
2923                         goto out;
2924                 }
2925         }
2926
2927         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2928         ret = 0;
2929 out:
2930         kfree(watchers);
2931         return ret;
2932 }
2933
2934 /*
2935  * lock_rwsem must be held for write
2936  */
2937 static int rbd_try_lock(struct rbd_device *rbd_dev)
2938 {
2939         struct ceph_client *client = rbd_dev->rbd_client->client;
2940         struct ceph_locker *lockers;
2941         u32 num_lockers;
2942         int ret;
2943
2944         for (;;) {
2945                 ret = rbd_lock(rbd_dev);
2946                 if (ret != -EBUSY)
2947                         return ret;
2948
2949                 /* determine if the current lock holder is still alive */
2950                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2951                 if (ret)
2952                         return ret;
2953
2954                 if (num_lockers == 0)
2955                         goto again;
2956
2957                 ret = find_watcher(rbd_dev, lockers);
2958                 if (ret) {
2959                         if (ret > 0)
2960                                 ret = 0; /* have to request lock */
2961                         goto out;
2962                 }
2963
2964                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2965                          ENTITY_NAME(lockers[0].id.name));
2966
2967                 ret = ceph_monc_blacklist_add(&client->monc,
2968                                               &lockers[0].info.addr);
2969                 if (ret) {
2970                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2971                                  ENTITY_NAME(lockers[0].id.name), ret);
2972                         goto out;
2973                 }
2974
2975                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2976                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
2977                                           lockers[0].id.cookie,
2978                                           &lockers[0].id.name);
2979                 if (ret && ret != -ENOENT)
2980                         goto out;
2981
2982 again:
2983                 ceph_free_lockers(lockers, num_lockers);
2984         }
2985
2986 out:
2987         ceph_free_lockers(lockers, num_lockers);
2988         return ret;
2989 }
2990
2991 /*
2992  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2993  */
2994 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2995                                                 int *pret)
2996 {
2997         enum rbd_lock_state lock_state;
2998
2999         down_read(&rbd_dev->lock_rwsem);
3000         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3001              rbd_dev->lock_state);
3002         if (__rbd_is_lock_owner(rbd_dev)) {
3003                 lock_state = rbd_dev->lock_state;
3004                 up_read(&rbd_dev->lock_rwsem);
3005                 return lock_state;
3006         }
3007
3008         up_read(&rbd_dev->lock_rwsem);
3009         down_write(&rbd_dev->lock_rwsem);
3010         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3011              rbd_dev->lock_state);
3012         if (!__rbd_is_lock_owner(rbd_dev)) {
3013                 *pret = rbd_try_lock(rbd_dev);
3014                 if (*pret)
3015                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3016         }
3017
3018         lock_state = rbd_dev->lock_state;
3019         up_write(&rbd_dev->lock_rwsem);
3020         return lock_state;
3021 }
3022
3023 static void rbd_acquire_lock(struct work_struct *work)
3024 {
3025         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3026                                             struct rbd_device, lock_dwork);
3027         enum rbd_lock_state lock_state;
3028         int ret = 0;
3029
3030         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3031 again:
3032         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3033         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3034                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3035                         wake_requests(rbd_dev, true);
3036                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3037                      rbd_dev, lock_state, ret);
3038                 return;
3039         }
3040
3041         ret = rbd_request_lock(rbd_dev);
3042         if (ret == -ETIMEDOUT) {
3043                 goto again; /* treat this as a dead client */
3044         } else if (ret == -EROFS) {
3045                 rbd_warn(rbd_dev, "peer will not release lock");
3046                 /*
3047                  * If this is rbd_add_acquire_lock(), we want to fail
3048                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3049                  * want to block.
3050                  */
3051                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3052                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3053                         /* wake "rbd map --exclusive" process */
3054                         wake_requests(rbd_dev, false);
3055                 }
3056         } else if (ret < 0) {
3057                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3058                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3059                                  RBD_RETRY_DELAY);
3060         } else {
3061                 /*
3062                  * lock owner acked, but resend if we don't see them
3063                  * release the lock
3064                  */
3065                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3066                      rbd_dev);
3067                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3068                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3069         }
3070 }
3071
3072 /*
3073  * lock_rwsem must be held for write
3074  */
3075 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3076 {
3077         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3078              rbd_dev->lock_state);
3079         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3080                 return false;
3081
3082         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3083         downgrade_write(&rbd_dev->lock_rwsem);
3084         /*
3085          * Ensure that all in-flight IO is flushed.
3086          *
3087          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3088          * may be shared with other devices.
3089          */
3090         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3091         up_read(&rbd_dev->lock_rwsem);
3092
3093         down_write(&rbd_dev->lock_rwsem);
3094         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3095              rbd_dev->lock_state);
3096         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3097                 return false;
3098
3099         rbd_unlock(rbd_dev);
3100         /*
3101          * Give others a chance to grab the lock - we would re-acquire
3102          * almost immediately if we got new IO during ceph_osdc_sync()
3103          * otherwise.  We need to ack our own notifications, so this
3104          * lock_dwork will be requeued from rbd_wait_state_locked()
3105          * after wake_requests() in rbd_handle_released_lock().
3106          */
3107         cancel_delayed_work(&rbd_dev->lock_dwork);
3108         return true;
3109 }
3110
3111 static void rbd_release_lock_work(struct work_struct *work)
3112 {
3113         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3114                                                   unlock_work);
3115
3116         down_write(&rbd_dev->lock_rwsem);
3117         rbd_release_lock(rbd_dev);
3118         up_write(&rbd_dev->lock_rwsem);
3119 }
3120
3121 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3122                                      void **p)
3123 {
3124         struct rbd_client_id cid = { 0 };
3125
3126         if (struct_v >= 2) {
3127                 cid.gid = ceph_decode_64(p);
3128                 cid.handle = ceph_decode_64(p);
3129         }
3130
3131         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3132              cid.handle);
3133         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3134                 down_write(&rbd_dev->lock_rwsem);
3135                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3136                         /*
3137                          * we already know that the remote client is
3138                          * the owner
3139                          */
3140                         up_write(&rbd_dev->lock_rwsem);
3141                         return;
3142                 }
3143
3144                 rbd_set_owner_cid(rbd_dev, &cid);
3145                 downgrade_write(&rbd_dev->lock_rwsem);
3146         } else {
3147                 down_read(&rbd_dev->lock_rwsem);
3148         }
3149
3150         if (!__rbd_is_lock_owner(rbd_dev))
3151                 wake_requests(rbd_dev, false);
3152         up_read(&rbd_dev->lock_rwsem);
3153 }
3154
3155 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3156                                      void **p)
3157 {
3158         struct rbd_client_id cid = { 0 };
3159
3160         if (struct_v >= 2) {
3161                 cid.gid = ceph_decode_64(p);
3162                 cid.handle = ceph_decode_64(p);
3163         }
3164
3165         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3166              cid.handle);
3167         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3168                 down_write(&rbd_dev->lock_rwsem);
3169                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3170                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3171                              __func__, rbd_dev, cid.gid, cid.handle,
3172                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3173                         up_write(&rbd_dev->lock_rwsem);
3174                         return;
3175                 }
3176
3177                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3178                 downgrade_write(&rbd_dev->lock_rwsem);
3179         } else {
3180                 down_read(&rbd_dev->lock_rwsem);
3181         }
3182
3183         if (!__rbd_is_lock_owner(rbd_dev))
3184                 wake_requests(rbd_dev, false);
3185         up_read(&rbd_dev->lock_rwsem);
3186 }
3187
3188 /*
3189  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3190  * ResponseMessage is needed.
3191  */
3192 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3193                                    void **p)
3194 {
3195         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3196         struct rbd_client_id cid = { 0 };
3197         int result = 1;
3198
3199         if (struct_v >= 2) {
3200                 cid.gid = ceph_decode_64(p);
3201                 cid.handle = ceph_decode_64(p);
3202         }
3203
3204         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3205              cid.handle);
3206         if (rbd_cid_equal(&cid, &my_cid))
3207                 return result;
3208
3209         down_read(&rbd_dev->lock_rwsem);
3210         if (__rbd_is_lock_owner(rbd_dev)) {
3211                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3212                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3213                         goto out_unlock;
3214
3215                 /*
3216                  * encode ResponseMessage(0) so the peer can detect
3217                  * a missing owner
3218                  */
3219                 result = 0;
3220
3221                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3222                         if (!rbd_dev->opts->exclusive) {
3223                                 dout("%s rbd_dev %p queueing unlock_work\n",
3224                                      __func__, rbd_dev);
3225                                 queue_work(rbd_dev->task_wq,
3226                                            &rbd_dev->unlock_work);
3227                         } else {
3228                                 /* refuse to release the lock */
3229                                 result = -EROFS;
3230                         }
3231                 }
3232         }
3233
3234 out_unlock:
3235         up_read(&rbd_dev->lock_rwsem);
3236         return result;
3237 }
3238
3239 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3240                                      u64 notify_id, u64 cookie, s32 *result)
3241 {
3242         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3243         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3244         int buf_size = sizeof(buf);
3245         int ret;
3246
3247         if (result) {
3248                 void *p = buf;
3249
3250                 /* encode ResponseMessage */
3251                 ceph_start_encoding(&p, 1, 1,
3252                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3253                 ceph_encode_32(&p, *result);
3254         } else {
3255                 buf_size = 0;
3256         }
3257
3258         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3259                                    &rbd_dev->header_oloc, notify_id, cookie,
3260                                    buf, buf_size);
3261         if (ret)
3262                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3263 }
3264
3265 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3266                                    u64 cookie)
3267 {
3268         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3269         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3270 }
3271
3272 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3273                                           u64 notify_id, u64 cookie, s32 result)
3274 {
3275         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3276         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3277 }
3278
3279 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3280                          u64 notifier_id, void *data, size_t data_len)
3281 {
3282         struct rbd_device *rbd_dev = arg;
3283         void *p = data;
3284         void *const end = p + data_len;
3285         u8 struct_v = 0;
3286         u32 len;
3287         u32 notify_op;
3288         int ret;
3289
3290         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3291              __func__, rbd_dev, cookie, notify_id, data_len);
3292         if (data_len) {
3293                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3294                                           &struct_v, &len);
3295                 if (ret) {
3296                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3297                                  ret);
3298                         return;
3299                 }
3300
3301                 notify_op = ceph_decode_32(&p);
3302         } else {
3303                 /* legacy notification for header updates */
3304                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3305                 len = 0;
3306         }
3307
3308         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3309         switch (notify_op) {
3310         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3311                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3312                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3313                 break;
3314         case RBD_NOTIFY_OP_RELEASED_LOCK:
3315                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3316                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3317                 break;
3318         case RBD_NOTIFY_OP_REQUEST_LOCK:
3319                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3320                 if (ret <= 0)
3321                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3322                                                       cookie, ret);
3323                 else
3324                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);