Linux 6.10-rc2
[sfrench/cifs-2.6.git] / drivers / md / dm-vdo / vio.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 #include "vio.h"
7
8 #include <linux/bio.h>
9 #include <linux/blkdev.h>
10 #include <linux/kernel.h>
11 #include <linux/ratelimit.h>
12
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "permassert.h"
16
17 #include "constants.h"
18 #include "io-submitter.h"
19 #include "vdo.h"
20
21 /* A vio_pool is a collection of preallocated vios. */
22 struct vio_pool {
23         /* The number of objects managed by the pool */
24         size_t size;
25         /* The list of objects which are available */
26         struct list_head available;
27         /* The queue of requestors waiting for objects from the pool */
28         struct vdo_wait_queue waiting;
29         /* The number of objects currently in use */
30         size_t busy_count;
31         /* The list of objects which are in use */
32         struct list_head busy;
33         /* The ID of the thread on which this pool may be used */
34         thread_id_t thread_id;
35         /* The buffer backing the pool's vios */
36         char *buffer;
37         /* The pool entries */
38         struct pooled_vio vios[];
39 };
40
41 physical_block_number_t pbn_from_vio_bio(struct bio *bio)
42 {
43         struct vio *vio = bio->bi_private;
44         struct vdo *vdo = vio->completion.vdo;
45         physical_block_number_t pbn = bio->bi_iter.bi_sector / VDO_SECTORS_PER_BLOCK;
46
47         return ((pbn == VDO_GEOMETRY_BLOCK_LOCATION) ? pbn : pbn + vdo->geometry.bio_offset);
48 }
49
50 static int create_multi_block_bio(block_count_t size, struct bio **bio_ptr)
51 {
52         struct bio *bio = NULL;
53         int result;
54
55         result = vdo_allocate_extended(struct bio, size + 1, struct bio_vec,
56                                        "bio", &bio);
57         if (result != VDO_SUCCESS)
58                 return result;
59
60         *bio_ptr = bio;
61         return VDO_SUCCESS;
62 }
63
64 int vdo_create_bio(struct bio **bio_ptr)
65 {
66         return create_multi_block_bio(1, bio_ptr);
67 }
68
69 void vdo_free_bio(struct bio *bio)
70 {
71         if (bio == NULL)
72                 return;
73
74         bio_uninit(bio);
75         vdo_free(vdo_forget(bio));
76 }
77
78 int allocate_vio_components(struct vdo *vdo, enum vio_type vio_type,
79                             enum vio_priority priority, void *parent,
80                             unsigned int block_count, char *data, struct vio *vio)
81 {
82         struct bio *bio;
83         int result;
84
85         result = VDO_ASSERT(block_count <= MAX_BLOCKS_PER_VIO,
86                             "block count %u does not exceed maximum %u", block_count,
87                             MAX_BLOCKS_PER_VIO);
88         if (result != VDO_SUCCESS)
89                 return result;
90
91         result = VDO_ASSERT(((vio_type != VIO_TYPE_UNINITIALIZED) && (vio_type != VIO_TYPE_DATA)),
92                             "%d is a metadata type", vio_type);
93         if (result != VDO_SUCCESS)
94                 return result;
95
96         result = create_multi_block_bio(block_count, &bio);
97         if (result != VDO_SUCCESS)
98                 return result;
99
100         initialize_vio(vio, bio, block_count, vio_type, priority, vdo);
101         vio->completion.parent = parent;
102         vio->data = data;
103         return VDO_SUCCESS;
104 }
105
106 /**
107  * create_multi_block_metadata_vio() - Create a vio.
108  * @vdo: The vdo on which the vio will operate.
109  * @vio_type: The type of vio to create.
110  * @priority: The relative priority to assign to the vio.
111  * @parent: The parent of the vio.
112  * @block_count: The size of the vio in blocks.
113  * @data: The buffer.
114  * @vio_ptr: A pointer to hold the new vio.
115  *
116  * Return: VDO_SUCCESS or an error.
117  */
118 int create_multi_block_metadata_vio(struct vdo *vdo, enum vio_type vio_type,
119                                     enum vio_priority priority, void *parent,
120                                     unsigned int block_count, char *data,
121                                     struct vio **vio_ptr)
122 {
123         struct vio *vio;
124         int result;
125
126         BUILD_BUG_ON(sizeof(struct vio) > 256);
127
128         /*
129          * Metadata vios should use direct allocation and not use the buffer pool, which is
130          * reserved for submissions from the linux block layer.
131          */
132         result = vdo_allocate(1, struct vio, __func__, &vio);
133         if (result != VDO_SUCCESS) {
134                 vdo_log_error("metadata vio allocation failure %d", result);
135                 return result;
136         }
137
138         result = allocate_vio_components(vdo, vio_type, priority, parent, block_count,
139                                          data, vio);
140         if (result != VDO_SUCCESS) {
141                 vdo_free(vio);
142                 return result;
143         }
144
145         *vio_ptr  = vio;
146         return VDO_SUCCESS;
147 }
148
149 /**
150  * free_vio_components() - Free the components of a vio embedded in a larger structure.
151  * @vio: The vio to destroy
152  */
153 void free_vio_components(struct vio *vio)
154 {
155         if (vio == NULL)
156                 return;
157
158         BUG_ON(is_data_vio(vio));
159         vdo_free_bio(vdo_forget(vio->bio));
160 }
161
162 /**
163  * free_vio() - Destroy a vio.
164  * @vio: The vio to destroy.
165  */
166 void free_vio(struct vio *vio)
167 {
168         free_vio_components(vio);
169         vdo_free(vio);
170 }
171
172 /* Set bio properties for a VDO read or write. */
173 void vdo_set_bio_properties(struct bio *bio, struct vio *vio, bio_end_io_t callback,
174                             blk_opf_t bi_opf, physical_block_number_t pbn)
175 {
176         struct vdo *vdo = vio->completion.vdo;
177         struct device_config *config = vdo->device_config;
178
179         pbn -= vdo->geometry.bio_offset;
180         vio->bio_zone = ((pbn / config->thread_counts.bio_rotation_interval) %
181                          config->thread_counts.bio_threads);
182
183         bio->bi_private = vio;
184         bio->bi_end_io = callback;
185         bio->bi_opf = bi_opf;
186         bio->bi_iter.bi_sector = pbn * VDO_SECTORS_PER_BLOCK;
187 }
188
189 /*
190  * Prepares the bio to perform IO with the specified buffer. May only be used on a VDO-allocated
191  * bio, as it assumes the bio wraps a 4k buffer that is 4k aligned, but there does not have to be a
192  * vio associated with the bio.
193  */
194 int vio_reset_bio(struct vio *vio, char *data, bio_end_io_t callback,
195                   blk_opf_t bi_opf, physical_block_number_t pbn)
196 {
197         int bvec_count, offset, len, i;
198         struct bio *bio = vio->bio;
199
200         bio_reset(bio, bio->bi_bdev, bi_opf);
201         vdo_set_bio_properties(bio, vio, callback, bi_opf, pbn);
202         if (data == NULL)
203                 return VDO_SUCCESS;
204
205         bio->bi_io_vec = bio->bi_inline_vecs;
206         bio->bi_max_vecs = vio->block_count + 1;
207         len = VDO_BLOCK_SIZE * vio->block_count;
208         offset = offset_in_page(data);
209         bvec_count = DIV_ROUND_UP(offset + len, PAGE_SIZE);
210
211         /*
212          * If we knew that data was always on one page, or contiguous pages, we wouldn't need the
213          * loop. But if we're using vmalloc, it's not impossible that the data is in different
214          * pages that can't be merged in bio_add_page...
215          */
216         for (i = 0; (i < bvec_count) && (len > 0); i++) {
217                 struct page *page;
218                 int bytes_added;
219                 int bytes = PAGE_SIZE - offset;
220
221                 if (bytes > len)
222                         bytes = len;
223
224                 page = is_vmalloc_addr(data) ? vmalloc_to_page(data) : virt_to_page(data);
225                 bytes_added = bio_add_page(bio, page, bytes, offset);
226
227                 if (bytes_added != bytes) {
228                         return vdo_log_error_strerror(VDO_BIO_CREATION_FAILED,
229                                                       "Could only add %i bytes to bio",
230                                                       bytes_added);
231                 }
232
233                 data += bytes;
234                 len -= bytes;
235                 offset = 0;
236         }
237
238         return VDO_SUCCESS;
239 }
240
241 /**
242  * update_vio_error_stats() - Update per-vio error stats and log the error.
243  * @vio: The vio which got an error.
244  * @format: The format of the message to log (a printf style format).
245  */
246 void update_vio_error_stats(struct vio *vio, const char *format, ...)
247 {
248         static DEFINE_RATELIMIT_STATE(error_limiter, DEFAULT_RATELIMIT_INTERVAL,
249                                       DEFAULT_RATELIMIT_BURST);
250         va_list args;
251         int priority;
252         struct vdo *vdo = vio->completion.vdo;
253
254         switch (vio->completion.result) {
255         case VDO_READ_ONLY:
256                 atomic64_inc(&vdo->stats.read_only_error_count);
257                 return;
258
259         case VDO_NO_SPACE:
260                 atomic64_inc(&vdo->stats.no_space_error_count);
261                 priority = VDO_LOG_DEBUG;
262                 break;
263
264         default:
265                 priority = VDO_LOG_ERR;
266         }
267
268         if (!__ratelimit(&error_limiter))
269                 return;
270
271         va_start(args, format);
272         vdo_vlog_strerror(priority, vio->completion.result, VDO_LOGGING_MODULE_NAME,
273                           format, args);
274         va_end(args);
275 }
276
277 void vio_record_metadata_io_error(struct vio *vio)
278 {
279         const char *description;
280         physical_block_number_t pbn = pbn_from_vio_bio(vio->bio);
281
282         if (bio_op(vio->bio) == REQ_OP_READ) {
283                 description = "read";
284         } else if ((vio->bio->bi_opf & REQ_PREFLUSH) == REQ_PREFLUSH) {
285                 description = (((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) ?
286                                "write+preflush+fua" :
287                                "write+preflush");
288         } else if ((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) {
289                 description = "write+fua";
290         } else {
291                 description = "write";
292         }
293
294         update_vio_error_stats(vio,
295                                "Completing %s vio of type %u for physical block %llu with error",
296                                description, vio->type, (unsigned long long) pbn);
297 }
298
299 /**
300  * make_vio_pool() - Create a new vio pool.
301  * @vdo: The vdo.
302  * @pool_size: The number of vios in the pool.
303  * @thread_id: The ID of the thread using this pool.
304  * @vio_type: The type of vios in the pool.
305  * @priority: The priority with which vios from the pool should be enqueued.
306  * @context: The context that each entry will have.
307  * @pool_ptr: The resulting pool.
308  *
309  * Return: A success or error code.
310  */
311 int make_vio_pool(struct vdo *vdo, size_t pool_size, thread_id_t thread_id,
312                   enum vio_type vio_type, enum vio_priority priority, void *context,
313                   struct vio_pool **pool_ptr)
314 {
315         struct vio_pool *pool;
316         char *ptr;
317         int result;
318
319         result = vdo_allocate_extended(struct vio_pool, pool_size, struct pooled_vio,
320                                        __func__, &pool);
321         if (result != VDO_SUCCESS)
322                 return result;
323
324         pool->thread_id = thread_id;
325         INIT_LIST_HEAD(&pool->available);
326         INIT_LIST_HEAD(&pool->busy);
327
328         result = vdo_allocate(pool_size * VDO_BLOCK_SIZE, char,
329                               "VIO pool buffer", &pool->buffer);
330         if (result != VDO_SUCCESS) {
331                 free_vio_pool(pool);
332                 return result;
333         }
334
335         ptr = pool->buffer;
336         for (pool->size = 0; pool->size < pool_size; pool->size++, ptr += VDO_BLOCK_SIZE) {
337                 struct pooled_vio *pooled = &pool->vios[pool->size];
338
339                 result = allocate_vio_components(vdo, vio_type, priority, NULL, 1, ptr,
340                                                  &pooled->vio);
341                 if (result != VDO_SUCCESS) {
342                         free_vio_pool(pool);
343                         return result;
344                 }
345
346                 pooled->context = context;
347                 list_add_tail(&pooled->pool_entry, &pool->available);
348         }
349
350         *pool_ptr = pool;
351         return VDO_SUCCESS;
352 }
353
354 /**
355  * free_vio_pool() - Destroy a vio pool.
356  * @pool: The pool to free.
357  */
358 void free_vio_pool(struct vio_pool *pool)
359 {
360         struct pooled_vio *pooled, *tmp;
361
362         if (pool == NULL)
363                 return;
364
365         /* Remove all available vios from the object pool. */
366         VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&pool->waiting),
367                             "VIO pool must not have any waiters when being freed");
368         VDO_ASSERT_LOG_ONLY((pool->busy_count == 0),
369                             "VIO pool must not have %zu busy entries when being freed",
370                             pool->busy_count);
371         VDO_ASSERT_LOG_ONLY(list_empty(&pool->busy),
372                             "VIO pool must not have busy entries when being freed");
373
374         list_for_each_entry_safe(pooled, tmp, &pool->available, pool_entry) {
375                 list_del(&pooled->pool_entry);
376                 free_vio_components(&pooled->vio);
377                 pool->size--;
378         }
379
380         VDO_ASSERT_LOG_ONLY(pool->size == 0,
381                             "VIO pool must not have missing entries when being freed");
382
383         vdo_free(vdo_forget(pool->buffer));
384         vdo_free(pool);
385 }
386
387 /**
388  * is_vio_pool_busy() - Check whether an vio pool has outstanding entries.
389  *
390  * Return: true if the pool is busy.
391  */
392 bool is_vio_pool_busy(struct vio_pool *pool)
393 {
394         return (pool->busy_count != 0);
395 }
396
397 /**
398  * acquire_vio_from_pool() - Acquire a vio and buffer from the pool (asynchronous).
399  * @pool: The vio pool.
400  * @waiter: Object that is requesting a vio.
401  */
402 void acquire_vio_from_pool(struct vio_pool *pool, struct vdo_waiter *waiter)
403 {
404         struct pooled_vio *pooled;
405
406         VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
407                             "acquire from active vio_pool called from correct thread");
408
409         if (list_empty(&pool->available)) {
410                 vdo_waitq_enqueue_waiter(&pool->waiting, waiter);
411                 return;
412         }
413
414         pooled = list_first_entry(&pool->available, struct pooled_vio, pool_entry);
415         pool->busy_count++;
416         list_move_tail(&pooled->pool_entry, &pool->busy);
417         (*waiter->callback)(waiter, pooled);
418 }
419
420 /**
421  * return_vio_to_pool() - Return a vio to the pool
422  * @pool: The vio pool.
423  * @vio: The pooled vio to return.
424  */
425 void return_vio_to_pool(struct vio_pool *pool, struct pooled_vio *vio)
426 {
427         VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
428                             "vio pool entry returned on same thread as it was acquired");
429
430         vio->vio.completion.error_handler = NULL;
431         vio->vio.completion.parent = NULL;
432         if (vdo_waitq_has_waiters(&pool->waiting)) {
433                 vdo_waitq_notify_next_waiter(&pool->waiting, NULL, vio);
434                 return;
435         }
436
437         list_move_tail(&vio->pool_entry, &pool->available);
438         --pool->busy_count;
439 }
440
441 /*
442  * Various counting functions for statistics.
443  * These are used for bios coming into VDO, as well as bios generated by VDO.
444  */
445 void vdo_count_bios(struct atomic_bio_stats *bio_stats, struct bio *bio)
446 {
447         if (((bio->bi_opf & REQ_PREFLUSH) != 0) && (bio->bi_iter.bi_size == 0)) {
448                 atomic64_inc(&bio_stats->empty_flush);
449                 atomic64_inc(&bio_stats->flush);
450                 return;
451         }
452
453         switch (bio_op(bio)) {
454         case REQ_OP_WRITE:
455                 atomic64_inc(&bio_stats->write);
456                 break;
457         case REQ_OP_READ:
458                 atomic64_inc(&bio_stats->read);
459                 break;
460         case REQ_OP_DISCARD:
461                 atomic64_inc(&bio_stats->discard);
462                 break;
463                 /*
464                  * All other operations are filtered out in dmvdo.c, or not created by VDO, so
465                  * shouldn't exist.
466                  */
467         default:
468                 VDO_ASSERT_LOG_ONLY(0, "Bio operation %d not a write, read, discard, or empty flush",
469                                     bio_op(bio));
470         }
471
472         if ((bio->bi_opf & REQ_PREFLUSH) != 0)
473                 atomic64_inc(&bio_stats->flush);
474         if (bio->bi_opf & REQ_FUA)
475                 atomic64_inc(&bio_stats->fua);
476 }
477
478 static void count_all_bios_completed(struct vio *vio, struct bio *bio)
479 {
480         struct atomic_statistics *stats = &vio->completion.vdo->stats;
481
482         if (is_data_vio(vio)) {
483                 vdo_count_bios(&stats->bios_out_completed, bio);
484                 return;
485         }
486
487         vdo_count_bios(&stats->bios_meta_completed, bio);
488         if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
489                 vdo_count_bios(&stats->bios_journal_completed, bio);
490         else if (vio->type == VIO_TYPE_BLOCK_MAP)
491                 vdo_count_bios(&stats->bios_page_cache_completed, bio);
492 }
493
494 void vdo_count_completed_bios(struct bio *bio)
495 {
496         struct vio *vio = (struct vio *) bio->bi_private;
497
498         atomic64_inc(&vio->completion.vdo->stats.bios_completed);
499         count_all_bios_completed(vio, bio);
500 }