1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
9 #include <linux/ratelimit.h>
13 #include "memory-alloc.h"
14 #include "permassert.h"
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
30 #include "wait-queue.h"
35 * The block map era, or maximum age, is used as follows:
37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40 * according to the sequence number they record.
42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44 * this era are issued for write. In all older eras, pages are issued for write immediately.
47 struct page_descriptor {
48 root_count_t root_index;
50 page_number_t page_index;
55 struct page_descriptor descriptor;
59 struct write_if_not_dirtied_context {
60 struct block_map_zone *zone;
64 struct block_map_tree_segment {
65 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
68 struct block_map_tree {
69 struct block_map_tree_segment *segments;
73 struct block_map *map;
75 struct boundary *boundaries;
76 struct tree_page **pages;
77 struct block_map_tree trees[];
81 page_number_t page_index;
88 struct vdo_waiter waiter;
89 struct block_map_tree *tree;
91 struct cursors *parent;
92 struct boundary boundary;
93 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94 struct pooled_vio *vio;
98 struct block_map_zone *zone;
99 struct vio_pool *pool;
100 vdo_entry_callback_fn entry_callback;
101 struct vdo_completion *completion;
102 root_count_t active_roots;
103 struct cursor cursors[];
106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113 .pbn_high_nibble = 0,
114 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122 * Prevents any compiler shenanigans from affecting other threads reading those stats.
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
126 static inline bool is_dirty(const struct page_info *info)
128 return info->state == PS_DIRTY;
131 static inline bool is_present(const struct page_info *info)
133 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
136 static inline bool is_in_flight(const struct page_info *info)
138 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
141 static inline bool is_incoming(const struct page_info *info)
143 return info->state == PS_INCOMING;
146 static inline bool is_outgoing(const struct page_info *info)
148 return info->state == PS_OUTGOING;
151 static inline bool is_valid(const struct page_info *info)
153 return is_present(info) || is_outgoing(info);
156 static char *get_page_buffer(struct page_info *info)
158 struct vdo_page_cache *cache = info->cache;
160 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
165 struct vdo_page_completion *completion;
170 completion = container_of(waiter, struct vdo_page_completion, waiter);
171 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
176 * initialize_info() - Initialize all page info structures and put them on the free list.
178 * Return: VDO_SUCCESS or an error.
180 static int initialize_info(struct vdo_page_cache *cache)
182 struct page_info *info;
184 INIT_LIST_HEAD(&cache->free_list);
185 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
189 info->state = PS_FREE;
192 result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
193 VIO_PRIORITY_METADATA, info,
194 get_page_buffer(info), &info->vio);
195 if (result != VDO_SUCCESS)
198 /* The thread ID should never change. */
199 info->vio->completion.callback_thread_id = cache->zone->thread_id;
201 INIT_LIST_HEAD(&info->state_entry);
202 list_add_tail(&info->state_entry, &cache->free_list);
203 INIT_LIST_HEAD(&info->lru_entry);
210 * allocate_cache_components() - Allocate components of the cache which require their own
212 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
215 * The caller is responsible for all clean up on errors.
217 * Return: VDO_SUCCESS or an error code.
219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
221 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
224 result = vdo_allocate(cache->page_count, struct page_info, "page infos",
226 if (result != VDO_SUCCESS)
229 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
230 if (result != VDO_SUCCESS)
233 result = vdo_int_map_create(cache->page_count, &cache->page_map);
234 if (result != VDO_SUCCESS)
237 return initialize_info(cache);
241 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
244 static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
245 const char *function_name)
247 thread_id_t thread_id = vdo_get_callback_thread_id();
249 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
250 "%s() must only be called on cache thread %d, not thread %d",
251 function_name, cache->zone->thread_id, thread_id);
254 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
255 static inline void assert_io_allowed(struct vdo_page_cache *cache)
257 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
258 "VDO page cache may issue I/O");
261 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
262 static void report_cache_pressure(struct vdo_page_cache *cache)
264 ADD_ONCE(cache->stats.cache_pressure, 1);
265 if (cache->waiter_count > cache->page_count) {
266 if ((cache->pressure_report % LOG_INTERVAL) == 0)
267 vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
269 if (++cache->pressure_report >= DISPLAY_INTERVAL)
270 cache->pressure_report = 0;
275 * get_page_state_name() - Return the name of a page state.
277 * If the page state is invalid a static string is returned and the invalid state is logged.
279 * Return: A pointer to a static page state name.
281 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
284 static const char * const state_names[] = {
285 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
288 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
290 result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
291 "Unknown page_state value %d", state);
292 if (result != VDO_SUCCESS)
293 return "[UNKNOWN PAGE STATE]";
295 return state_names[state];
299 * update_counter() - Update the counter associated with a given state.
300 * @info: The page info to count.
301 * @delta: The delta to apply to the counter.
303 static void update_counter(struct page_info *info, s32 delta)
305 struct block_map_statistics *stats = &info->cache->stats;
307 switch (info->state) {
309 ADD_ONCE(stats->free_pages, delta);
313 ADD_ONCE(stats->incoming_pages, delta);
317 ADD_ONCE(stats->outgoing_pages, delta);
321 ADD_ONCE(stats->failed_pages, delta);
325 ADD_ONCE(stats->clean_pages, delta);
329 ADD_ONCE(stats->dirty_pages, delta);
337 /** update_lru() - Update the lru information for an active page. */
338 static void update_lru(struct page_info *info)
340 if (info->cache->lru_list.prev != &info->lru_entry)
341 list_move_tail(&info->lru_entry, &info->cache->lru_list);
345 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
348 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
350 if (new_state == info->state)
353 update_counter(info, -1);
354 info->state = new_state;
355 update_counter(info, 1);
357 switch (info->state) {
360 list_move_tail(&info->state_entry, &info->cache->free_list);
364 list_move_tail(&info->state_entry, &info->cache->outgoing_list);
371 list_del_init(&info->state_entry);
375 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
376 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
378 struct vdo_page_cache *cache = info->cache;
380 /* Either the new or the old page number must be NO_PAGE. */
381 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
382 "Must free a page before reusing it.");
383 if (result != VDO_SUCCESS)
386 if (info->pbn != NO_PAGE)
387 vdo_int_map_remove(cache->page_map, info->pbn);
391 if (pbn != NO_PAGE) {
392 result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
393 if (result != VDO_SUCCESS)
399 /** reset_page_info() - Reset page info to represent an unallocated page. */
400 static int reset_page_info(struct page_info *info)
404 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
405 if (result != VDO_SUCCESS)
408 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
409 "VDO Page must not have waiters");
410 if (result != VDO_SUCCESS)
413 result = set_info_pbn(info, NO_PAGE);
414 set_info_state(info, PS_FREE);
415 list_del_init(&info->lru_entry);
420 * find_free_page() - Find a free page.
422 * Return: A pointer to the page info structure (if found), NULL otherwise.
424 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
426 struct page_info *info;
428 info = list_first_entry_or_null(&cache->free_list, struct page_info,
431 list_del_init(&info->state_entry);
437 * find_page() - Find the page info (if any) associated with a given pbn.
438 * @pbn: The absolute physical block number of the page.
440 * Return: The page info for the page if available, or NULL if not.
442 static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
443 physical_block_number_t pbn)
445 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
446 return cache->last_found;
448 cache->last_found = vdo_int_map_get(cache->page_map, pbn);
449 return cache->last_found;
453 * select_lru_page() - Determine which page is least recently used.
455 * Picks the least recently used from among the non-busy entries at the front of each of the lru
456 * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely
457 * that the entries at the front are busy unless the queue is very short, but not impossible.
459 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
460 * found. The page can be dirty or resident.
462 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
464 struct page_info *info;
466 list_for_each_entry(info, &cache->lru_list, lru_entry)
467 if ((info->busy == 0) && !is_in_flight(info))
473 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
476 * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
477 * @info: The page info representing the result page.
478 * @vdo_page_comp: The VDO page completion to complete.
480 static void complete_with_page(struct page_info *info,
481 struct vdo_page_completion *vdo_page_comp)
483 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
486 vdo_log_error_strerror(VDO_BAD_PAGE,
487 "Requested cache page %llu in state %s is not %s",
488 (unsigned long long) info->pbn,
489 get_page_state_name(info->state),
490 vdo_page_comp->writable ? "present" : "valid");
491 vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
495 vdo_page_comp->info = info;
496 vdo_page_comp->ready = true;
497 vdo_finish_completion(&vdo_page_comp->completion);
501 * complete_waiter_with_error() - Complete a page completion with an error code.
502 * @waiter: The page completion, as a waiter.
503 * @result_ptr: A pointer to the error code.
505 * Implements waiter_callback_fn.
507 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
509 int *result = result_ptr;
511 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
515 * complete_waiter_with_page() - Complete a page completion with a page.
516 * @waiter: The page completion, as a waiter.
517 * @page_info: The page info to complete with.
519 * Implements waiter_callback_fn.
521 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
523 complete_with_page(page_info, page_completion_from_waiter(waiter));
527 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
529 * Upon completion the waitq will be empty.
531 * Return: The number of pages distributed.
533 static unsigned int distribute_page_over_waitq(struct page_info *info,
534 struct vdo_wait_queue *waitq)
539 num_pages = vdo_waitq_num_waiters(waitq);
542 * Increment the busy count once for each pending completion so that this page does not
543 * stop being busy until all completions have been processed.
545 info->busy += num_pages;
547 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
552 * set_persistent_error() - Set a persistent error which all requests will receive in the future.
553 * @context: A string describing what triggered the error.
555 * Once triggered, all enqueued completions will get this error. Any future requests will result in
556 * this error as well.
558 static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
561 struct page_info *info;
562 /* If we're already read-only, there's no need to log. */
563 struct vdo *vdo = cache->vdo;
565 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
566 vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
568 vdo_enter_read_only_mode(vdo, result);
571 assert_on_cache_thread(cache, __func__);
573 vdo_waitq_notify_all_waiters(&cache->free_waiters,
574 complete_waiter_with_error, &result);
575 cache->waiter_count = 0;
577 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
578 vdo_waitq_notify_all_waiters(&info->waiting,
579 complete_waiter_with_error, &result);
584 * validate_completed_page() - Check that a page completion which is being freed to the cache
585 * referred to a valid page and is in a valid state.
586 * @writable: Whether a writable page is required.
588 * Return: VDO_SUCCESS if the page was valid, otherwise as error
590 static int __must_check validate_completed_page(struct vdo_page_completion *completion,
595 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
596 if (result != VDO_SUCCESS)
599 result = VDO_ASSERT(completion->info != NULL,
600 "VDO Page Completion must be complete");
601 if (result != VDO_SUCCESS)
604 result = VDO_ASSERT(completion->info->pbn == completion->pbn,
605 "VDO Page Completion pbn must be consistent");
606 if (result != VDO_SUCCESS)
609 result = VDO_ASSERT(is_valid(completion->info),
610 "VDO Page Completion page must be valid");
611 if (result != VDO_SUCCESS)
615 result = VDO_ASSERT(completion->writable,
616 "VDO Page Completion must be writable");
617 if (result != VDO_SUCCESS)
624 static void check_for_drain_complete(struct block_map_zone *zone)
626 if (vdo_is_state_draining(&zone->state) &&
627 (zone->active_lookups == 0) &&
628 !vdo_waitq_has_waiters(&zone->flush_waiters) &&
629 !is_vio_pool_busy(zone->vio_pool) &&
630 (zone->page_cache.outstanding_reads == 0) &&
631 (zone->page_cache.outstanding_writes == 0)) {
632 vdo_finish_draining_with_result(&zone->state,
633 (vdo_is_read_only(zone->block_map->vdo) ?
634 VDO_READ_ONLY : VDO_SUCCESS));
638 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
640 vdo_enter_read_only_mode(zone->block_map->vdo, result);
643 * We are in read-only mode, so we won't ever write any page out.
644 * Just take all waiters off the waitq so the zone can drain.
646 vdo_waitq_init(&zone->flush_waiters);
647 check_for_drain_complete(zone);
650 static bool __must_check
651 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
654 int result = validate_completed_page(completion, writable);
656 if (result == VDO_SUCCESS)
659 enter_zone_read_only_mode(completion->info->cache->zone, result);
664 * handle_load_error() - Handle page load errors.
665 * @completion: The page read vio.
667 static void handle_load_error(struct vdo_completion *completion)
669 int result = completion->result;
670 struct page_info *info = completion->parent;
671 struct vdo_page_cache *cache = info->cache;
673 assert_on_cache_thread(cache, __func__);
674 vio_record_metadata_io_error(as_vio(completion));
675 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
676 ADD_ONCE(cache->stats.failed_reads, 1);
677 set_info_state(info, PS_FAILED);
678 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
679 reset_page_info(info);
682 * Don't decrement until right before calling check_for_drain_complete() to
683 * ensure that the above work can't cause the page cache to be freed out from under us.
685 cache->outstanding_reads--;
686 check_for_drain_complete(cache->zone);
690 * page_is_loaded() - Callback used when a page has been loaded.
691 * @completion: The vio which has loaded the page. Its parent is the page_info.
693 static void page_is_loaded(struct vdo_completion *completion)
695 struct page_info *info = completion->parent;
696 struct vdo_page_cache *cache = info->cache;
697 nonce_t nonce = info->cache->zone->block_map->nonce;
698 struct block_map_page *page;
699 enum block_map_page_validity validity;
701 assert_on_cache_thread(cache, __func__);
703 page = (struct block_map_page *) get_page_buffer(info);
704 validity = vdo_validate_block_map_page(page, nonce, info->pbn);
705 if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
706 physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
707 int result = vdo_log_error_strerror(VDO_BAD_PAGE,
708 "Expected page %llu but got page %llu instead",
709 (unsigned long long) info->pbn,
710 (unsigned long long) pbn);
712 vdo_continue_completion(completion, result);
716 if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
717 vdo_format_block_map_page(page, nonce, info->pbn, false);
719 info->recovery_lock = 0;
720 set_info_state(info, PS_RESIDENT);
721 distribute_page_over_waitq(info, &info->waiting);
724 * Don't decrement until right before calling check_for_drain_complete() to
725 * ensure that the above work can't cause the page cache to be freed out from under us.
727 cache->outstanding_reads--;
728 check_for_drain_complete(cache->zone);
732 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
733 * @completion: The page load completion.
735 static void handle_rebuild_read_error(struct vdo_completion *completion)
737 struct page_info *info = completion->parent;
738 struct vdo_page_cache *cache = info->cache;
740 assert_on_cache_thread(cache, __func__);
743 * We are doing a read-only rebuild, so treat this as a successful read
744 * of an uninitialized page.
746 vio_record_metadata_io_error(as_vio(completion));
747 ADD_ONCE(cache->stats.failed_reads, 1);
748 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
749 vdo_reset_completion(completion);
750 page_is_loaded(completion);
753 static void load_cache_page_endio(struct bio *bio)
755 struct vio *vio = bio->bi_private;
756 struct page_info *info = vio->completion.parent;
758 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
762 * launch_page_load() - Begin the process of loading a page.
764 * Return: VDO_SUCCESS or an error code.
766 static int __must_check launch_page_load(struct page_info *info,
767 physical_block_number_t pbn)
770 vdo_action_fn callback;
771 struct vdo_page_cache *cache = info->cache;
773 assert_io_allowed(cache);
775 result = set_info_pbn(info, pbn);
776 if (result != VDO_SUCCESS)
779 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
780 if (result != VDO_SUCCESS)
783 set_info_state(info, PS_INCOMING);
784 cache->outstanding_reads++;
785 ADD_ONCE(cache->stats.pages_loaded, 1);
786 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
787 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
788 callback, REQ_OP_READ | REQ_PRIO);
792 static void write_pages(struct vdo_completion *completion);
794 /** handle_flush_error() - Handle errors flushing the layer. */
795 static void handle_flush_error(struct vdo_completion *completion)
797 struct page_info *info = completion->parent;
799 vio_record_metadata_io_error(as_vio(completion));
800 set_persistent_error(info->cache, "flush failed", completion->result);
801 write_pages(completion);
804 static void flush_endio(struct bio *bio)
806 struct vio *vio = bio->bi_private;
807 struct page_info *info = vio->completion.parent;
809 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
812 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
813 static void save_pages(struct vdo_page_cache *cache)
815 struct page_info *info;
818 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
821 assert_io_allowed(cache);
823 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
825 cache->pages_in_flush = cache->pages_to_flush;
826 cache->pages_to_flush = 0;
827 ADD_ONCE(cache->stats.flush_count, 1);
832 * We must make sure that the recovery journal entries that changed these pages were
833 * successfully persisted, and thus must issue a flush before each batch of pages is
834 * written to ensure this.
836 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
840 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
842 * Once in the list, a page may not be used until it has been written out.
844 static void schedule_page_save(struct page_info *info)
846 if (info->busy > 0) {
847 info->write_status = WRITE_STATUS_DEFERRED;
851 info->cache->pages_to_flush++;
852 info->cache->outstanding_writes++;
853 set_info_state(info, PS_OUTGOING);
857 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
858 * pages if another save is not in progress.
860 static void launch_page_save(struct page_info *info)
862 schedule_page_save(info);
863 save_pages(info->cache);
867 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
868 * requesting a given page number.
869 * @context: A pointer to the pbn of the desired page.
871 * Implements waiter_match_fn.
873 * Return: true if the page completion is for the desired page number.
875 static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
877 physical_block_number_t *pbn = context;
879 return (page_completion_from_waiter(waiter)->pbn == *pbn);
883 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
884 * any other completions that match it in page number.
886 static void allocate_free_page(struct page_info *info)
889 struct vdo_waiter *oldest_waiter;
890 physical_block_number_t pbn;
891 struct vdo_page_cache *cache = info->cache;
893 assert_on_cache_thread(cache, __func__);
895 if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
896 if (cache->stats.cache_pressure > 0) {
897 vdo_log_info("page cache pressure relieved");
898 WRITE_ONCE(cache->stats.cache_pressure, 0);
904 result = reset_page_info(info);
905 if (result != VDO_SUCCESS) {
906 set_persistent_error(cache, "cannot reset page info", result);
910 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
911 pbn = page_completion_from_waiter(oldest_waiter)->pbn;
914 * Remove all entries which match the page number in question and push them onto the page
917 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
918 &pbn, &info->waiting);
919 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
921 result = launch_page_load(info, pbn);
922 if (result != VDO_SUCCESS) {
923 vdo_waitq_notify_all_waiters(&info->waiting,
924 complete_waiter_with_error, &result);
929 * discard_a_page() - Begin the process of discarding a page.
931 * If no page is discardable, increments a count of deferred frees so that the next release of a
932 * page which is no longer busy will kick off another discard cycle. This is an indication that the
933 * cache is not big enough.
935 * If the selected page is not dirty, immediately allocates the page to the oldest completion
936 * waiting for a free page.
938 static void discard_a_page(struct vdo_page_cache *cache)
940 struct page_info *info = select_lru_page(cache);
943 report_cache_pressure(cache);
947 if (!is_dirty(info)) {
948 allocate_free_page(info);
952 VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
953 "page selected for discard is not in flight");
955 cache->discard_count++;
956 info->write_status = WRITE_STATUS_DISCARD;
957 launch_page_save(info);
961 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
964 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
966 struct vdo_page_cache *cache = vdo_page_comp->cache;
968 cache->waiter_count++;
969 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
970 discard_a_page(cache);
974 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
976 * @cache: The page cache.
978 static void discard_page_if_needed(struct vdo_page_cache *cache)
980 if (cache->waiter_count > cache->discard_count)
981 discard_a_page(cache);
985 * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
986 * @info: The info structure for the page whose write just completed.
988 * Return: true if the page write was a discard.
990 static bool write_has_finished(struct page_info *info)
992 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
994 assert_on_cache_thread(info->cache, __func__);
995 info->cache->outstanding_writes--;
997 info->write_status = WRITE_STATUS_NORMAL;
1002 * handle_page_write_error() - Handler for page write errors.
1003 * @completion: The page write vio.
1005 static void handle_page_write_error(struct vdo_completion *completion)
1007 int result = completion->result;
1008 struct page_info *info = completion->parent;
1009 struct vdo_page_cache *cache = info->cache;
1011 vio_record_metadata_io_error(as_vio(completion));
1013 /* If we're already read-only, write failures are to be expected. */
1014 if (result != VDO_READ_ONLY) {
1015 vdo_log_ratelimit(vdo_log_error,
1016 "failed to write block map page %llu",
1017 (unsigned long long) info->pbn);
1020 set_info_state(info, PS_DIRTY);
1021 ADD_ONCE(cache->stats.failed_writes, 1);
1022 set_persistent_error(cache, "cannot write page", result);
1024 if (!write_has_finished(info))
1025 discard_page_if_needed(cache);
1027 check_for_drain_complete(cache->zone);
1030 static void page_is_written_out(struct vdo_completion *completion);
1032 static void write_cache_page_endio(struct bio *bio)
1034 struct vio *vio = bio->bi_private;
1035 struct page_info *info = vio->completion.parent;
1037 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1041 * page_is_written_out() - Callback used when a page has been written out.
1042 * @completion: The vio which wrote the page. Its parent is a page_info.
1044 static void page_is_written_out(struct vdo_completion *completion)
1046 bool was_discard, reclaimed;
1048 struct page_info *info = completion->parent;
1049 struct vdo_page_cache *cache = info->cache;
1050 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1052 if (!page->header.initialized) {
1053 page->header.initialized = true;
1054 vdo_submit_metadata_vio(info->vio, info->pbn,
1055 write_cache_page_endio,
1056 handle_page_write_error,
1057 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1061 /* Handle journal updates and torn write protection. */
1062 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1063 info->recovery_lock,
1064 VDO_ZONE_TYPE_LOGICAL,
1065 cache->zone->zone_number);
1066 info->recovery_lock = 0;
1067 was_discard = write_has_finished(info);
1068 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1070 set_info_state(info, PS_RESIDENT);
1072 reclamations = distribute_page_over_waitq(info, &info->waiting);
1073 ADD_ONCE(cache->stats.reclaimed, reclamations);
1076 cache->discard_count--;
1079 discard_page_if_needed(cache);
1081 allocate_free_page(info);
1083 check_for_drain_complete(cache->zone);
1087 * write_pages() - Write the batch of pages which were covered by the layer flush which just
1089 * @flush_completion: The flush vio.
1091 * This callback is registered in save_pages().
1093 static void write_pages(struct vdo_completion *flush_completion)
1095 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1098 * We need to cache these two values on the stack since it is possible for the last
1099 * page info to cause the page cache to get freed. Hence once we launch the last page,
1100 * it may be unsafe to dereference the cache.
1102 bool has_unflushed_pages = (cache->pages_to_flush > 0);
1103 page_count_t pages_in_flush = cache->pages_in_flush;
1105 cache->pages_in_flush = 0;
1106 while (pages_in_flush-- > 0) {
1107 struct page_info *info =
1108 list_first_entry(&cache->outgoing_list, struct page_info,
1111 list_del_init(&info->state_entry);
1112 if (vdo_is_read_only(info->cache->vdo)) {
1113 struct vdo_completion *completion = &info->vio->completion;
1115 vdo_reset_completion(completion);
1116 completion->callback = page_is_written_out;
1117 completion->error_handler = handle_page_write_error;
1118 vdo_fail_completion(completion, VDO_READ_ONLY);
1121 ADD_ONCE(info->cache->stats.pages_saved, 1);
1122 vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1123 handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1126 if (has_unflushed_pages) {
1128 * If there are unflushed pages, the cache can't have been freed, so this call is
1136 * vdo_release_page_completion() - Release a VDO Page Completion.
1138 * The page referenced by this completion (if any) will no longer be held busy by this completion.
1139 * If a page becomes discardable and there are completions awaiting free pages then a new round of
1140 * page discarding is started.
1142 void vdo_release_page_completion(struct vdo_completion *completion)
1144 struct page_info *discard_info = NULL;
1145 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1146 struct vdo_page_cache *cache;
1148 if (completion->result == VDO_SUCCESS) {
1149 if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1152 if (--page_completion->info->busy == 0)
1153 discard_info = page_completion->info;
1156 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1157 "Page being released after leaving all queues");
1159 page_completion->info = NULL;
1160 cache = page_completion->cache;
1161 assert_on_cache_thread(cache, __func__);
1163 if (discard_info != NULL) {
1164 if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1165 discard_info->write_status = WRITE_STATUS_NORMAL;
1166 launch_page_save(discard_info);
1170 * if there are excess requests for pages (that have not already started discards)
1171 * we need to discard some page (which may be this one)
1173 discard_page_if_needed(cache);
1178 * load_page_for_completion() - Helper function to load a page as described by a VDO Page
1181 static void load_page_for_completion(struct page_info *info,
1182 struct vdo_page_completion *vdo_page_comp)
1186 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1187 result = launch_page_load(info, vdo_page_comp->pbn);
1188 if (result != VDO_SUCCESS) {
1189 vdo_waitq_notify_all_waiters(&info->waiting,
1190 complete_waiter_with_error, &result);
1195 * vdo_get_page() - Initialize a page completion and get a block map page.
1196 * @page_completion: The vdo_page_completion to initialize.
1197 * @zone: The block map zone of the desired page.
1198 * @pbn: The absolute physical block of the desired page.
1199 * @writable: Whether the page can be modified.
1200 * @parent: The object to notify when the fetch is complete.
1201 * @callback: The notification callback.
1202 * @error_handler: The handler for fetch errors.
1203 * @requeue: Whether we must requeue when notifying the parent.
1205 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1206 * by the completion to be loaded from disk. When the callback is invoked, the page will be
1207 * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1208 * when they are done with the page to clear the busy mark.
1210 void vdo_get_page(struct vdo_page_completion *page_completion,
1211 struct block_map_zone *zone, physical_block_number_t pbn,
1212 bool writable, void *parent, vdo_action_fn callback,
1213 vdo_action_fn error_handler, bool requeue)
1215 struct vdo_page_cache *cache = &zone->page_cache;
1216 struct vdo_completion *completion = &page_completion->completion;
1217 struct page_info *info;
1219 assert_on_cache_thread(cache, __func__);
1220 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1221 "New page completion was not already on a wait queue");
1223 *page_completion = (struct vdo_page_completion) {
1225 .writable = writable,
1229 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1230 vdo_prepare_completion(completion, callback, error_handler,
1231 cache->zone->thread_id, parent);
1232 completion->requeue = requeue;
1234 if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1235 vdo_fail_completion(completion, VDO_READ_ONLY);
1239 if (page_completion->writable)
1240 ADD_ONCE(cache->stats.write_count, 1);
1242 ADD_ONCE(cache->stats.read_count, 1);
1244 info = find_page(cache, page_completion->pbn);
1246 /* The page is in the cache already. */
1247 if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1248 is_incoming(info) ||
1249 (is_outgoing(info) && page_completion->writable)) {
1250 /* The page is unusable until it has finished I/O. */
1251 ADD_ONCE(cache->stats.wait_for_page, 1);
1252 vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1256 if (is_valid(info)) {
1257 /* The page is usable. */
1258 ADD_ONCE(cache->stats.found_in_cache, 1);
1259 if (!is_present(info))
1260 ADD_ONCE(cache->stats.read_outgoing, 1);
1263 complete_with_page(info, page_completion);
1267 /* Something horrible has gone wrong. */
1268 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1271 /* The page must be fetched. */
1272 info = find_free_page(cache);
1274 ADD_ONCE(cache->stats.fetch_required, 1);
1275 load_page_for_completion(info, page_completion);
1279 /* The page must wait for a page to be discarded. */
1280 ADD_ONCE(cache->stats.discard_required, 1);
1281 discard_page_for_completion(page_completion);
1285 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1286 * @completion: The vdo_page_completion containing the page.
1288 void vdo_request_page_write(struct vdo_completion *completion)
1290 struct page_info *info;
1291 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1293 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1296 info = vdo_page_comp->info;
1297 set_info_state(info, PS_DIRTY);
1298 launch_page_save(info);
1302 * vdo_get_cached_page() - Get the block map page from a page completion.
1303 * @completion: A vdo page completion whose callback has been called.
1304 * @page_ptr: A pointer to hold the page
1306 * Return: VDO_SUCCESS or an error
1308 int vdo_get_cached_page(struct vdo_completion *completion,
1309 struct block_map_page **page_ptr)
1312 struct vdo_page_completion *vpc;
1314 vpc = as_vdo_page_completion(completion);
1315 result = validate_completed_page(vpc, true);
1316 if (result == VDO_SUCCESS)
1317 *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1323 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1325 * There must not be any dirty pages in the cache.
1327 * Return: A success or error code.
1329 int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1331 struct page_info *info;
1333 assert_on_cache_thread(cache, __func__);
1335 /* Make sure we don't throw away any dirty pages. */
1336 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1337 int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1339 if (result != VDO_SUCCESS)
1343 /* Reset the page map by re-allocating it. */
1344 vdo_int_map_free(vdo_forget(cache->page_map));
1345 return vdo_int_map_create(cache->page_count, &cache->page_map);
1349 * get_tree_page_by_index() - Get the tree page for a given height and page index.
1351 * Return: The requested page.
1353 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1354 root_count_t root_index,
1356 page_number_t page_index)
1358 page_number_t offset = 0;
1361 for (segment = 0; segment < forest->segments; segment++) {
1362 page_number_t border = forest->boundaries[segment].levels[height - 1];
1364 if (page_index < border) {
1365 struct block_map_tree *tree = &forest->trees[root_index];
1367 return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1376 /* Get the page referred to by the lock's tree slot at its current height. */
1377 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1378 const struct tree_lock *lock)
1380 return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1382 lock->tree_slots[lock->height].page_index);
1385 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
1386 bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1387 physical_block_number_t pbn,
1388 struct block_map_page *page)
1390 struct block_map_page *loaded = (struct block_map_page *) buffer;
1391 enum block_map_page_validity validity =
1392 vdo_validate_block_map_page(loaded, nonce, pbn);
1394 if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1395 memcpy(page, loaded, VDO_BLOCK_SIZE);
1399 if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1400 vdo_log_error_strerror(VDO_BAD_PAGE,
1401 "Expected page %llu but got page %llu instead",
1402 (unsigned long long) pbn,
1403 (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1410 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1411 * a cyclic range of values from 0 to (modulus - 1).
1412 * @lower: The lowest value to accept.
1413 * @value: The value to check.
1414 * @upper: The highest value to accept.
1415 * @modulus: The size of the cyclic space, no more than 2^15.
1417 * The value and both bounds must be smaller than the modulus.
1419 * Return: true if the value is in range.
1421 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1427 return (value <= upper);
1431 * is_not_older() - Check whether a generation is strictly older than some other generation in the
1432 * context of a zone's current generation range.
1433 * @zone: The zone in which to do the comparison.
1434 * @a: The generation in question.
1435 * @b: The generation to compare to.
1437 * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1439 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1443 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1444 in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1445 "generation(s) %u, %u are out of range [%u, %u]",
1446 a, b, zone->oldest_generation, zone->generation);
1447 if (result != VDO_SUCCESS) {
1448 enter_zone_read_only_mode(zone, result);
1452 return in_cyclic_range(b, a, zone->generation, 1 << 8);
1455 static void release_generation(struct block_map_zone *zone, u8 generation)
1459 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1460 "dirty page count underflow for generation %u", generation);
1461 if (result != VDO_SUCCESS) {
1462 enter_zone_read_only_mode(zone, result);
1466 zone->dirty_page_counts[generation]--;
1467 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1468 (zone->oldest_generation != zone->generation))
1469 zone->oldest_generation++;
1472 static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1477 bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1478 u8 old_generation = page->generation;
1480 if (decrement_old && (old_generation == new_generation))
1483 page->generation = new_generation;
1484 new_count = ++zone->dirty_page_counts[new_generation];
1485 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1487 if (result != VDO_SUCCESS) {
1488 enter_zone_read_only_mode(zone, result);
1493 release_generation(zone, old_generation);
1496 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1498 /* Implements waiter_callback_fn */
1499 static void write_page_callback(struct vdo_waiter *waiter, void *context)
1501 write_page(container_of(waiter, struct tree_page, waiter), context);
1504 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1506 waiter->callback = write_page_callback;
1507 acquire_vio_from_pool(zone->vio_pool, waiter);
1510 /* Return: true if all possible generations were not already active */
1511 static bool attempt_increment(struct block_map_zone *zone)
1513 u8 generation = zone->generation + 1;
1515 if (zone->oldest_generation == generation)
1518 zone->generation = generation;
1522 /* Launches a flush if one is not already in progress. */
1523 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1525 if ((zone->flusher == NULL) && attempt_increment(zone)) {
1526 zone->flusher = page;
1527 acquire_vio(&page->waiter, zone);
1531 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1534 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1536 struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1537 struct write_if_not_dirtied_context *write_context = context;
1539 if (page->generation == write_context->generation) {
1540 acquire_vio(waiter, write_context->zone);
1544 enqueue_page(page, write_context->zone);
1547 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1549 return_vio_to_pool(zone->vio_pool, vio);
1550 check_for_drain_complete(zone);
1553 /* This callback is registered in write_initialized_page(). */
1554 static void finish_page_write(struct vdo_completion *completion)
1557 struct vio *vio = as_vio(completion);
1558 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1559 struct tree_page *page = completion->parent;
1560 struct block_map_zone *zone = pooled->context;
1562 vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1563 page->writing_recovery_lock,
1564 VDO_ZONE_TYPE_LOGICAL,
1567 dirty = (page->writing_generation != page->generation);
1568 release_generation(zone, page->writing_generation);
1569 page->writing = false;
1571 if (zone->flusher == page) {
1572 struct write_if_not_dirtied_context context = {
1574 .generation = page->writing_generation,
1577 vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1578 write_page_if_not_dirtied, &context);
1579 if (dirty && attempt_increment(zone)) {
1580 write_page(page, pooled);
1584 zone->flusher = NULL;
1588 enqueue_page(page, zone);
1589 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1590 attempt_increment(zone)) {
1591 zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1592 struct tree_page, waiter);
1593 write_page(zone->flusher, pooled);
1597 return_to_pool(zone, pooled);
1600 static void handle_write_error(struct vdo_completion *completion)
1602 int result = completion->result;
1603 struct vio *vio = as_vio(completion);
1604 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1605 struct block_map_zone *zone = pooled->context;
1607 vio_record_metadata_io_error(vio);
1608 enter_zone_read_only_mode(zone, result);
1609 return_to_pool(zone, pooled);
1612 static void write_page_endio(struct bio *bio);
1614 static void write_initialized_page(struct vdo_completion *completion)
1616 struct vio *vio = as_vio(completion);
1617 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1618 struct block_map_zone *zone = pooled->context;
1619 struct tree_page *tree_page = completion->parent;
1620 struct block_map_page *page = (struct block_map_page *) vio->data;
1621 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1624 * Now that we know the page has been written at least once, mark the copy we are writing
1627 page->header.initialized = true;
1629 if (zone->flusher == tree_page)
1630 operation |= REQ_PREFLUSH;
1632 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1633 write_page_endio, handle_write_error,
1637 static void write_page_endio(struct bio *bio)
1639 struct pooled_vio *vio = bio->bi_private;
1640 struct block_map_zone *zone = vio->context;
1641 struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1643 continue_vio_after_io(&vio->vio,
1644 (page->header.initialized ?
1645 finish_page_write : write_initialized_page),
1649 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1651 struct vdo_completion *completion = &vio->vio.completion;
1652 struct block_map_zone *zone = vio->context;
1653 struct block_map_page *page = vdo_as_block_map_page(tree_page);
1655 if ((zone->flusher != tree_page) &&
1656 is_not_older(zone, tree_page->generation, zone->generation)) {
1658 * This page was re-dirtied after the last flush was issued, hence we need to do
1661 enqueue_page(tree_page, zone);
1662 return_to_pool(zone, vio);
1666 completion->parent = tree_page;
1667 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1668 completion->callback_thread_id = zone->thread_id;
1670 tree_page->writing = true;
1671 tree_page->writing_generation = tree_page->generation;
1672 tree_page->writing_recovery_lock = tree_page->recovery_lock;
1674 /* Clear this now so that we know this page is not on any dirty list. */
1675 tree_page->recovery_lock = 0;
1678 * We've already copied the page into the vio which will write it, so if it was not yet
1679 * initialized, the first write will indicate that (for torn write protection). It is now
1680 * safe to mark it as initialized in memory since if the write fails, the in memory state
1681 * will become irrelevant.
1683 if (page->header.initialized) {
1684 write_initialized_page(completion);
1688 page->header.initialized = true;
1689 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1690 write_page_endio, handle_write_error,
1691 REQ_OP_WRITE | REQ_PRIO);
1694 /* Release a lock on a page which was being loaded or allocated. */
1695 static void release_page_lock(struct data_vio *data_vio, char *what)
1697 struct block_map_zone *zone;
1698 struct tree_lock *lock_holder;
1699 struct tree_lock *lock = &data_vio->tree_lock;
1701 VDO_ASSERT_LOG_ONLY(lock->locked,
1702 "release of unlocked block map page %s for key %llu in tree %u",
1703 what, (unsigned long long) lock->key, lock->root_index);
1705 zone = data_vio->logical.zone->block_map_zone;
1706 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1707 VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1708 "block map page %s mismatch for key %llu in tree %u",
1709 what, (unsigned long long) lock->key, lock->root_index);
1710 lock->locked = false;
1713 static void finish_lookup(struct data_vio *data_vio, int result)
1715 data_vio->tree_lock.height = 0;
1717 --data_vio->logical.zone->block_map_zone->active_lookups;
1719 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1720 data_vio->vio.completion.error_handler = handle_data_vio_error;
1721 continue_data_vio_with_error(data_vio, result);
1724 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1726 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1727 int result = *((int *) context);
1729 if (!data_vio->write) {
1730 if (result == VDO_NO_SPACE)
1731 result = VDO_SUCCESS;
1732 } else if (result != VDO_NO_SPACE) {
1733 result = VDO_READ_ONLY;
1736 finish_lookup(data_vio, result);
1739 static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1741 if (result != VDO_NO_SPACE)
1742 enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1744 if (data_vio->tree_lock.locked) {
1745 release_page_lock(data_vio, what);
1746 vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1747 abort_lookup_for_waiter,
1751 finish_lookup(data_vio, result);
1754 static void abort_load(struct data_vio *data_vio, int result)
1756 abort_lookup(data_vio, result, "load");
1759 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1760 const struct data_location *mapping,
1763 if (!vdo_is_valid_location(mapping) ||
1764 vdo_is_state_compressed(mapping->state) ||
1765 (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1768 /* Roots aren't physical data blocks, so we can't check their PBNs. */
1769 if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1772 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1775 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1776 static void allocate_block_map_page(struct block_map_zone *zone,
1777 struct data_vio *data_vio);
1779 static void continue_with_loaded_page(struct data_vio *data_vio,
1780 struct block_map_page *page)
1782 struct tree_lock *lock = &data_vio->tree_lock;
1783 struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1784 struct data_location mapping =
1785 vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1787 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1788 vdo_log_error_strerror(VDO_BAD_MAPPING,
1789 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1790 (unsigned long long) mapping.pbn, mapping.state,
1791 lock->tree_slots[lock->height - 1].page_index,
1793 abort_load(data_vio, VDO_BAD_MAPPING);
1797 if (!vdo_is_mapped_location(&mapping)) {
1798 /* The page we need is unallocated */
1799 allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1804 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1805 if (lock->height == 1) {
1806 finish_lookup(data_vio, VDO_SUCCESS);
1810 /* We know what page we need to load next */
1811 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1814 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1816 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1818 data_vio->tree_lock.height--;
1819 continue_with_loaded_page(data_vio, context);
1822 static void finish_block_map_page_load(struct vdo_completion *completion)
1824 physical_block_number_t pbn;
1825 struct tree_page *tree_page;
1826 struct block_map_page *page;
1828 struct vio *vio = as_vio(completion);
1829 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1830 struct data_vio *data_vio = completion->parent;
1831 struct block_map_zone *zone = pooled->context;
1832 struct tree_lock *tree_lock = &data_vio->tree_lock;
1834 tree_lock->height--;
1835 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1836 tree_page = get_tree_page(zone, tree_lock);
1837 page = (struct block_map_page *) tree_page->page_buffer;
1838 nonce = zone->block_map->nonce;
1840 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1841 vdo_format_block_map_page(page, nonce, pbn, false);
1842 return_vio_to_pool(zone->vio_pool, pooled);
1844 /* Release our claim to the load and wake any waiters */
1845 release_page_lock(data_vio, "load");
1846 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1847 continue_with_loaded_page(data_vio, page);
1850 static void handle_io_error(struct vdo_completion *completion)
1852 int result = completion->result;
1853 struct vio *vio = as_vio(completion);
1854 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1855 struct data_vio *data_vio = completion->parent;
1856 struct block_map_zone *zone = pooled->context;
1858 vio_record_metadata_io_error(vio);
1859 return_vio_to_pool(zone->vio_pool, pooled);
1860 abort_load(data_vio, result);
1863 static void load_page_endio(struct bio *bio)
1865 struct vio *vio = bio->bi_private;
1866 struct data_vio *data_vio = vio->completion.parent;
1868 continue_vio_after_io(vio, finish_block_map_page_load,
1869 data_vio->logical.zone->thread_id);
1872 static void load_page(struct vdo_waiter *waiter, void *context)
1874 struct pooled_vio *pooled = context;
1875 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1876 struct tree_lock *lock = &data_vio->tree_lock;
1877 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1879 pooled->vio.completion.parent = data_vio;
1880 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1881 handle_io_error, REQ_OP_READ | REQ_PRIO);
1885 * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1886 * acquired, @data_vio->tree_lock.locked will be true.
1888 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1891 struct tree_lock *lock_holder;
1892 struct tree_lock *lock = &data_vio->tree_lock;
1893 height_t height = lock->height;
1894 struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1897 key.descriptor = (struct page_descriptor) {
1898 .root_index = lock->root_index,
1900 .page_index = tree_slot.page_index,
1901 .slot = tree_slot.block_map_slot.slot,
1903 lock->key = key.key;
1905 result = vdo_int_map_put(zone->loading_pages, lock->key,
1906 lock, false, (void **) &lock_holder);
1907 if (result != VDO_SUCCESS)
1910 if (lock_holder == NULL) {
1911 /* We got the lock */
1912 data_vio->tree_lock.locked = true;
1916 /* Someone else is loading or allocating the page we need */
1917 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1921 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
1922 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1926 result = attempt_page_lock(zone, data_vio);
1927 if (result != VDO_SUCCESS) {
1928 abort_load(data_vio, result);
1932 if (data_vio->tree_lock.locked) {
1933 data_vio->waiter.callback = load_page;
1934 acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1938 static void allocation_failure(struct vdo_completion *completion)
1940 struct data_vio *data_vio = as_data_vio(completion);
1942 if (vdo_requeue_completion_if_needed(completion,
1943 data_vio->logical.zone->thread_id))
1946 abort_lookup(data_vio, completion->result, "allocation");
1949 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1951 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1952 struct tree_lock *tree_lock = &data_vio->tree_lock;
1953 physical_block_number_t pbn = *((physical_block_number_t *) context);
1955 tree_lock->height--;
1956 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1958 if (tree_lock->height == 0) {
1959 finish_lookup(data_vio, VDO_SUCCESS);
1963 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1966 /** expire_oldest_list() - Expire the oldest list. */
1967 static void expire_oldest_list(struct dirty_lists *dirty_lists)
1969 block_count_t i = dirty_lists->offset++;
1971 dirty_lists->oldest_period++;
1972 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1973 list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1974 &dirty_lists->expired[VDO_TREE_PAGE]);
1977 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1978 list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1979 &dirty_lists->expired[VDO_CACHE_PAGE]);
1982 if (dirty_lists->offset == dirty_lists->maximum_age)
1983 dirty_lists->offset = 0;
1987 /** update_period() - Update the dirty_lists period if necessary. */
1988 static void update_period(struct dirty_lists *dirty, sequence_number_t period)
1990 while (dirty->next_period <= period) {
1991 if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
1992 expire_oldest_list(dirty);
1993 dirty->next_period++;
1997 /** write_expired_elements() - Write out the expired list. */
1998 static void write_expired_elements(struct block_map_zone *zone)
2000 struct tree_page *page, *ttmp;
2001 struct page_info *info, *ptmp;
2002 struct list_head *expired;
2003 u8 generation = zone->generation;
2005 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2006 list_for_each_entry_safe(page, ttmp, expired, entry) {
2009 list_del_init(&page->entry);
2011 result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2012 "Newly expired page not already waiting to write");
2013 if (result != VDO_SUCCESS) {
2014 enter_zone_read_only_mode(zone, result);
2018 set_generation(zone, page, generation);
2020 enqueue_page(page, zone);
2023 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2024 list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2025 list_del_init(&info->state_entry);
2026 schedule_page_save(info);
2029 save_pages(&zone->page_cache);
2033 * add_to_dirty_lists() - Add an element to the dirty lists.
2034 * @zone: The zone in which we are operating.
2035 * @entry: The list entry of the element to add.
2036 * @type: The type of page.
2037 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2038 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2041 static void add_to_dirty_lists(struct block_map_zone *zone,
2042 struct list_head *entry,
2043 enum block_map_page_type type,
2044 sequence_number_t old_period,
2045 sequence_number_t new_period)
2047 struct dirty_lists *dirty_lists = zone->dirty_lists;
2049 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2052 if (new_period < dirty_lists->oldest_period) {
2053 list_move_tail(entry, &dirty_lists->expired[type]);
2055 update_period(dirty_lists, new_period);
2056 list_move_tail(entry,
2057 &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2060 write_expired_elements(zone);
2064 * Record the allocation in the tree and wake any waiters now that the write lock has been
2067 static void finish_block_map_allocation(struct vdo_completion *completion)
2069 physical_block_number_t pbn;
2070 struct tree_page *tree_page;
2071 struct block_map_page *page;
2072 sequence_number_t old_lock;
2073 struct data_vio *data_vio = as_data_vio(completion);
2074 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2075 struct tree_lock *tree_lock = &data_vio->tree_lock;
2076 height_t height = tree_lock->height;
2078 assert_data_vio_in_logical_zone(data_vio);
2080 tree_page = get_tree_page(zone, tree_lock);
2081 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2083 /* Record the allocation. */
2084 page = (struct block_map_page *) tree_page->page_buffer;
2085 old_lock = tree_page->recovery_lock;
2086 vdo_update_block_map_page(page, data_vio, pbn,
2087 VDO_MAPPING_STATE_UNCOMPRESSED,
2088 &tree_page->recovery_lock);
2090 if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2091 /* This page is waiting to be written out. */
2092 if (zone->flusher != tree_page) {
2094 * The outstanding flush won't cover the update we just made,
2095 * so mark the page as needing another flush.
2097 set_generation(zone, tree_page, zone->generation);
2100 /* Put the page on a dirty list */
2102 INIT_LIST_HEAD(&tree_page->entry);
2103 add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2104 old_lock, tree_page->recovery_lock);
2107 tree_lock->height--;
2109 /* Format the interior node we just allocated (in memory). */
2110 tree_page = get_tree_page(zone, tree_lock);
2111 vdo_format_block_map_page(tree_page->page_buffer,
2112 zone->block_map->nonce,
2116 /* Release our claim to the allocation and wake any waiters */
2117 release_page_lock(data_vio, "allocation");
2118 vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2119 continue_allocation_for_waiter, &pbn);
2120 if (tree_lock->height == 0) {
2121 finish_lookup(data_vio, VDO_SUCCESS);
2125 allocate_block_map_page(zone, data_vio);
2128 static void release_block_map_write_lock(struct vdo_completion *completion)
2130 struct data_vio *data_vio = as_data_vio(completion);
2132 assert_data_vio_in_allocated_zone(data_vio);
2134 release_data_vio_allocation_lock(data_vio, true);
2135 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2139 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2140 * to prevent deduplication against the block after we release the write lock on it, but before we
2141 * write out the page.
2143 static void set_block_map_page_reference_count(struct vdo_completion *completion)
2145 struct data_vio *data_vio = as_data_vio(completion);
2147 assert_data_vio_in_allocated_zone(data_vio);
2149 completion->callback = release_block_map_write_lock;
2150 vdo_modify_reference_count(completion, &data_vio->increment_updater);
2153 static void journal_block_map_allocation(struct vdo_completion *completion)
2155 struct data_vio *data_vio = as_data_vio(completion);
2157 assert_data_vio_in_journal_zone(data_vio);
2159 set_data_vio_allocated_zone_callback(data_vio,
2160 set_block_map_page_reference_count);
2161 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2164 static void allocate_block(struct vdo_completion *completion)
2166 struct data_vio *data_vio = as_data_vio(completion);
2167 struct tree_lock *lock = &data_vio->tree_lock;
2168 physical_block_number_t pbn;
2170 assert_data_vio_in_allocated_zone(data_vio);
2172 if (!vdo_allocate_block_in_zone(data_vio))
2175 pbn = data_vio->allocation.pbn;
2176 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2177 data_vio->increment_updater = (struct reference_updater) {
2178 .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2182 .state = VDO_MAPPING_STATE_UNCOMPRESSED,
2184 .lock = data_vio->allocation.lock,
2187 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2190 static void allocate_block_map_page(struct block_map_zone *zone,
2191 struct data_vio *data_vio)
2195 if (!data_vio->write || data_vio->is_discard) {
2196 /* This is a pure read or a discard, so there's nothing left to do here. */
2197 finish_lookup(data_vio, VDO_SUCCESS);
2201 result = attempt_page_lock(zone, data_vio);
2202 if (result != VDO_SUCCESS) {
2203 abort_lookup(data_vio, result, "allocation");
2207 if (!data_vio->tree_lock.locked)
2210 data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2211 allocate_block, allocation_failure);
2215 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2216 * resides and cache that result in the data_vio.
2218 * All ancestors in the tree will be allocated or loaded, as needed.
2220 void vdo_find_block_map_slot(struct data_vio *data_vio)
2222 page_number_t page_index;
2223 struct block_map_tree_slot tree_slot;
2224 struct data_location mapping;
2225 struct block_map_page *page = NULL;
2226 struct tree_lock *lock = &data_vio->tree_lock;
2227 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2229 zone->active_lookups++;
2230 if (vdo_is_state_draining(&zone->state)) {
2231 finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2235 lock->tree_slots[0].block_map_slot.slot =
2236 data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2237 page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2238 tree_slot = (struct block_map_tree_slot) {
2239 .page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2242 .slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2246 for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2247 physical_block_number_t pbn;
2249 lock->tree_slots[lock->height] = tree_slot;
2250 page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2251 pbn = vdo_get_block_map_page_pbn(page);
2252 if (pbn != VDO_ZERO_BLOCK) {
2253 lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2257 /* Calculate the index and slot for the next level. */
2258 tree_slot.block_map_slot.slot =
2259 tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2260 tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2263 /* The page at this height has been allocated and loaded. */
2264 mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2265 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2266 vdo_log_error_strerror(VDO_BAD_MAPPING,
2267 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2268 (unsigned long long) mapping.pbn, mapping.state,
2269 lock->tree_slots[lock->height - 1].page_index,
2271 abort_load(data_vio, VDO_BAD_MAPPING);
2275 if (!vdo_is_mapped_location(&mapping)) {
2276 /* The page we want one level down has not been allocated, so allocate it. */
2277 allocate_block_map_page(zone, data_vio);
2281 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2282 if (lock->height == 1) {
2283 /* This is the ultimate block map page, so we're done */
2284 finish_lookup(data_vio, VDO_SUCCESS);
2288 /* We know what page we need to load. */
2289 load_block_map_page(zone, data_vio);
2293 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2294 * pages have been loaded, otherwise, it may give the wrong answer (0).
2296 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2297 page_number_t page_number)
2299 struct data_location mapping;
2300 struct tree_page *tree_page;
2301 struct block_map_page *page;
2302 root_count_t root_index = page_number % map->root_count;
2303 page_number_t page_index = page_number / map->root_count;
2304 slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2306 page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2308 tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2309 page = (struct block_map_page *) tree_page->page_buffer;
2310 if (!page->header.initialized)
2311 return VDO_ZERO_BLOCK;
2313 mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2314 if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2315 return VDO_ZERO_BLOCK;
2320 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2321 * method is used when correcting errors in the tree during read-only rebuild.
2323 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2325 bool waiting = vdo_waiter_is_waiting(&page->waiter);
2327 if (waiting && (zone->flusher == page))
2330 set_generation(zone, page, zone->generation);
2331 if (waiting || page->writing)
2334 enqueue_page(page, zone);
2337 static int make_segment(struct forest *old_forest, block_count_t new_pages,
2338 struct boundary *new_boundary, struct forest *forest)
2340 size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2341 struct tree_page *page_ptr;
2342 page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2347 forest->segments = index + 1;
2349 result = vdo_allocate(forest->segments, struct boundary,
2350 "forest boundary array", &forest->boundaries);
2351 if (result != VDO_SUCCESS)
2354 result = vdo_allocate(forest->segments, struct tree_page *,
2355 "forest page pointers", &forest->pages);
2356 if (result != VDO_SUCCESS)
2359 result = vdo_allocate(new_pages, struct tree_page,
2360 "new forest pages", &forest->pages[index]);
2361 if (result != VDO_SUCCESS)
2365 memcpy(forest->boundaries, old_forest->boundaries,
2366 index * sizeof(struct boundary));
2367 memcpy(forest->pages, old_forest->pages,
2368 index * sizeof(struct tree_page *));
2371 memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2373 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2374 segment_sizes[height] = new_boundary->levels[height];
2376 segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2379 page_ptr = forest->pages[index];
2380 for (root = 0; root < forest->map->root_count; root++) {
2381 struct block_map_tree_segment *segment;
2382 struct block_map_tree *tree = &(forest->trees[root]);
2385 int result = vdo_allocate(forest->segments,
2386 struct block_map_tree_segment,
2387 "tree root segments", &tree->segments);
2388 if (result != VDO_SUCCESS)
2392 memcpy(tree->segments, old_forest->trees[root].segments,
2393 index * sizeof(struct block_map_tree_segment));
2396 segment = &(tree->segments[index]);
2397 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2398 if (segment_sizes[height] == 0)
2401 segment->levels[height] = page_ptr;
2402 if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2403 /* Record the root. */
2404 struct block_map_page *page =
2405 vdo_format_block_map_page(page_ptr->page_buffer,
2407 VDO_INVALID_PBN, true);
2409 vdo_pack_block_map_entry(forest->map->root_origin + root,
2410 VDO_MAPPING_STATE_UNCOMPRESSED);
2412 page_ptr += segment_sizes[height];
2419 static void deforest(struct forest *forest, size_t first_page_segment)
2423 if (forest->pages != NULL) {
2426 for (segment = first_page_segment; segment < forest->segments; segment++)
2427 vdo_free(forest->pages[segment]);
2428 vdo_free(forest->pages);
2431 for (root = 0; root < forest->map->root_count; root++)
2432 vdo_free(forest->trees[root].segments);
2434 vdo_free(forest->boundaries);
2439 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2441 * @entries: The number of entries the block map will hold.
2443 * Return: VDO_SUCCESS or an error.
2445 static int make_forest(struct block_map *map, block_count_t entries)
2447 struct forest *forest, *old_forest = map->forest;
2448 struct boundary new_boundary, *old_boundary = NULL;
2449 block_count_t new_pages;
2452 if (old_forest != NULL)
2453 old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2455 new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2456 entries, &new_boundary);
2457 if (new_pages == 0) {
2458 map->next_entry_count = entries;
2462 result = vdo_allocate_extended(struct forest, map->root_count,
2463 struct block_map_tree, __func__,
2465 if (result != VDO_SUCCESS)
2469 result = make_segment(old_forest, new_pages, &new_boundary, forest);
2470 if (result != VDO_SUCCESS) {
2471 deforest(forest, forest->segments - 1);
2475 map->next_forest = forest;
2476 map->next_entry_count = entries;
2481 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2483 static void replace_forest(struct block_map *map)
2485 if (map->next_forest != NULL) {
2486 if (map->forest != NULL)
2487 deforest(map->forest, map->forest->segments);
2488 map->forest = vdo_forget(map->next_forest);
2491 map->entry_count = map->next_entry_count;
2492 map->next_entry_count = 0;
2496 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2499 static void finish_cursor(struct cursor *cursor)
2501 struct cursors *cursors = cursor->parent;
2502 struct vdo_completion *completion = cursors->completion;
2504 return_vio_to_pool(cursors->pool, vdo_forget(cursor->vio));
2505 if (--cursors->active_roots > 0)
2510 vdo_finish_completion(completion);
2513 static void traverse(struct cursor *cursor);
2516 * continue_traversal() - Continue traversing a block map tree.
2517 * @completion: The VIO doing a read or write.
2519 static void continue_traversal(struct vdo_completion *completion)
2521 vio_record_metadata_io_error(as_vio(completion));
2522 traverse(completion->parent);
2526 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2527 * @completion: The VIO doing the read.
2529 static void finish_traversal_load(struct vdo_completion *completion)
2531 struct cursor *cursor = completion->parent;
2532 height_t height = cursor->height;
2533 struct cursor_level *level = &cursor->levels[height];
2534 struct tree_page *tree_page =
2535 &(cursor->tree->segments[0].levels[height][level->page_index]);
2536 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2538 vdo_copy_valid_page(cursor->vio->vio.data,
2539 cursor->parent->zone->block_map->nonce,
2540 pbn_from_vio_bio(cursor->vio->vio.bio), page);
2544 static void traversal_endio(struct bio *bio)
2546 struct vio *vio = bio->bi_private;
2547 struct cursor *cursor = vio->completion.parent;
2549 continue_vio_after_io(vio, finish_traversal_load,
2550 cursor->parent->zone->thread_id);
2554 * traverse() - Traverse a single block map tree.
2556 * This is the recursive heart of the traversal process.
2558 static void traverse(struct cursor *cursor)
2560 for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2561 height_t height = cursor->height;
2562 struct cursor_level *level = &cursor->levels[height];
2563 struct tree_page *tree_page =
2564 &(cursor->tree->segments[0].levels[height][level->page_index]);
2565 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2567 if (!page->header.initialized)
2570 for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2571 struct cursor_level *next_level;
2572 page_number_t entry_index =
2573 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2574 struct data_location location =
2575 vdo_unpack_block_map_entry(&page->entries[level->slot]);
2577 if (!vdo_is_valid_location(&location)) {
2578 /* This entry is invalid, so remove it from the page. */
2579 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2580 vdo_write_tree_page(tree_page, cursor->parent->zone);
2584 if (!vdo_is_mapped_location(&location))
2587 /* Erase mapped entries past the end of the logical space. */
2588 if (entry_index >= cursor->boundary.levels[height]) {
2589 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2590 vdo_write_tree_page(tree_page, cursor->parent->zone);
2594 if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2595 int result = cursor->parent->entry_callback(location.pbn,
2596 cursor->parent->completion);
2597 if (result != VDO_SUCCESS) {
2598 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2599 vdo_write_tree_page(tree_page, cursor->parent->zone);
2604 if (cursor->height == 0)
2608 next_level = &cursor->levels[cursor->height];
2609 next_level->page_index = entry_index;
2610 next_level->slot = 0;
2612 vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2613 traversal_endio, continue_traversal,
2614 REQ_OP_READ | REQ_PRIO);
2619 finish_cursor(cursor);
2623 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2624 * which to load pages.
2625 * @context: The pooled_vio just acquired.
2627 * Implements waiter_callback_fn.
2629 static void launch_cursor(struct vdo_waiter *waiter, void *context)
2631 struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2632 struct pooled_vio *pooled = context;
2634 cursor->vio = pooled;
2635 pooled->vio.completion.parent = cursor;
2636 pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2641 * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2643 * Return: The list of page counts as a boundary structure.
2645 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2647 struct boundary boundary;
2649 page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2651 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2652 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2653 * roots starting from tree 0.
2655 page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2656 page_count_t level_pages = leaf_pages / map->root_count;
2658 if (root_index <= last_tree_root)
2661 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2662 boundary.levels[height] = level_pages;
2663 level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2666 /* The root node always exists, even if the root is otherwise unused. */
2667 boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2673 * vdo_traverse_forest() - Walk the entire forest of a block map.
2674 * @callback: A function to call with the pbn of each allocated node in the forest.
2675 * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2677 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2678 struct vdo_completion *completion)
2681 struct cursors *cursors;
2684 result = vdo_allocate_extended(struct cursors, map->root_count,
2685 struct cursor, __func__, &cursors);
2686 if (result != VDO_SUCCESS) {
2687 vdo_fail_completion(completion, result);
2691 cursors->zone = &map->zones[0];
2692 cursors->pool = cursors->zone->vio_pool;
2693 cursors->entry_callback = callback;
2694 cursors->completion = completion;
2695 cursors->active_roots = map->root_count;
2696 for (root = 0; root < map->root_count; root++) {
2697 struct cursor *cursor = &cursors->cursors[root];
2699 *cursor = (struct cursor) {
2700 .tree = &map->forest->trees[root],
2701 .height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2703 .boundary = compute_boundary(map, root),
2706 cursor->waiter.callback = launch_cursor;
2707 acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2712 * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2713 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2716 static int __must_check initialize_block_map_zone(struct block_map *map,
2717 zone_count_t zone_number,
2718 page_count_t cache_size,
2719 block_count_t maximum_age)
2723 struct vdo *vdo = map->vdo;
2724 struct block_map_zone *zone = &map->zones[zone_number];
2726 BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2728 zone->zone_number = zone_number;
2729 zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2730 zone->block_map = map;
2732 result = vdo_allocate_extended(struct dirty_lists, maximum_age,
2733 dirty_era_t, __func__,
2734 &zone->dirty_lists);
2735 if (result != VDO_SUCCESS)
2738 zone->dirty_lists->maximum_age = maximum_age;
2739 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2740 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2742 for (i = 0; i < maximum_age; i++) {
2743 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2744 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2747 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2748 if (result != VDO_SUCCESS)
2751 result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE,
2752 zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2753 VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2754 if (result != VDO_SUCCESS)
2757 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2759 zone->page_cache.zone = zone;
2760 zone->page_cache.vdo = vdo;
2761 zone->page_cache.page_count = cache_size / map->zone_count;
2762 zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2764 result = allocate_cache_components(&zone->page_cache);
2765 if (result != VDO_SUCCESS)
2768 /* initialize empty circular queues */
2769 INIT_LIST_HEAD(&zone->page_cache.lru_list);
2770 INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2775 /* Implements vdo_zone_thread_getter_fn */
2776 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2778 struct block_map *map = context;
2780 return map->zones[zone_number].thread_id;
2783 /* Implements vdo_action_preamble_fn */
2784 static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2786 struct block_map *map = context;
2788 map->current_era_point = map->pending_era_point;
2789 vdo_finish_completion(parent);
2792 /* Implements vdo_zone_action_fn */
2793 static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2794 struct vdo_completion *parent)
2796 struct block_map *map = context;
2797 struct block_map_zone *zone = &map->zones[zone_number];
2799 update_period(zone->dirty_lists, map->current_era_point);
2800 write_expired_elements(zone);
2801 vdo_finish_completion(parent);
2805 * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2806 * vdo_schedule_default_action() on the block map's action manager.
2808 * Implements vdo_action_scheduler_fn.
2810 static bool schedule_era_advance(void *context)
2812 struct block_map *map = context;
2814 if (map->current_era_point == map->pending_era_point)
2817 return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2818 advance_block_map_zone_era, NULL, NULL);
2821 static void uninitialize_block_map_zone(struct block_map_zone *zone)
2823 struct vdo_page_cache *cache = &zone->page_cache;
2825 vdo_free(vdo_forget(zone->dirty_lists));
2826 free_vio_pool(vdo_forget(zone->vio_pool));
2827 vdo_int_map_free(vdo_forget(zone->loading_pages));
2828 if (cache->infos != NULL) {
2829 struct page_info *info;
2831 for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2832 free_vio(vdo_forget(info->vio));
2835 vdo_int_map_free(vdo_forget(cache->page_map));
2836 vdo_free(vdo_forget(cache->infos));
2837 vdo_free(vdo_forget(cache->pages));
2840 void vdo_free_block_map(struct block_map *map)
2847 for (zone = 0; zone < map->zone_count; zone++)
2848 uninitialize_block_map_zone(&map->zones[zone]);
2850 vdo_abandon_block_map_growth(map);
2851 if (map->forest != NULL)
2852 deforest(vdo_forget(map->forest), 0);
2853 vdo_free(vdo_forget(map->action_manager));
2857 /* @journal may be NULL. */
2858 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2859 struct vdo *vdo, struct recovery_journal *journal,
2860 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2861 struct block_map **map_ptr)
2863 struct block_map *map;
2865 zone_count_t zone = 0;
2867 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2868 ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2869 sizeof(struct block_map_entry)));
2870 result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2871 if (result != VDO_SUCCESS)
2874 result = vdo_allocate_extended(struct block_map,
2875 vdo->thread_config.logical_zone_count,
2876 struct block_map_zone, __func__, &map);
2877 if (result != VDO_SUCCESS)
2881 map->root_origin = state.root_origin;
2882 map->root_count = state.root_count;
2883 map->entry_count = logical_blocks;
2884 map->journal = journal;
2887 result = make_forest(map, map->entry_count);
2888 if (result != VDO_SUCCESS) {
2889 vdo_free_block_map(map);
2893 replace_forest(map);
2895 map->zone_count = vdo->thread_config.logical_zone_count;
2896 for (zone = 0; zone < map->zone_count; zone++) {
2897 result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2898 if (result != VDO_SUCCESS) {
2899 vdo_free_block_map(map);
2904 result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2905 vdo_get_recovery_journal_thread_id(journal),
2906 map, schedule_era_advance, vdo,
2907 &map->action_manager);
2908 if (result != VDO_SUCCESS) {
2909 vdo_free_block_map(map);
2917 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2919 return (struct block_map_state_2_0) {
2920 .flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2921 /* This is the flat page count, which has turned out to always be 0. */
2922 .flat_page_count = 0,
2923 .root_origin = map->root_origin,
2924 .root_count = map->root_count,
2928 /* The block map needs to know the journals' sequence number to initialize the eras. */
2929 void vdo_initialize_block_map_from_journal(struct block_map *map,
2930 struct recovery_journal *journal)
2934 map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2935 map->pending_era_point = map->current_era_point;
2937 for (z = 0; z < map->zone_count; z++) {
2938 struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2940 VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2941 dirty_lists->oldest_period = map->current_era_point;
2942 dirty_lists->next_period = map->current_era_point + 1;
2943 dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2947 /* Compute the logical zone for the LBN of a data vio. */
2948 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2950 struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2951 struct tree_lock *tree_lock = &data_vio->tree_lock;
2952 page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2954 tree_lock->tree_slots[0].page_index = page_number;
2955 tree_lock->root_index = page_number % map->root_count;
2956 return (tree_lock->root_index % map->zone_count);
2959 void vdo_advance_block_map_era(struct block_map *map,
2960 sequence_number_t recovery_block_number)
2965 map->pending_era_point = recovery_block_number;
2966 vdo_schedule_default_action(map->action_manager);
2969 /* Implements vdo_admin_initiator_fn */
2970 static void initiate_drain(struct admin_state *state)
2972 struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
2974 VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
2975 "%s() called with no active lookups", __func__);
2977 if (!vdo_is_state_suspending(state)) {
2978 while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
2979 expire_oldest_list(zone->dirty_lists);
2980 write_expired_elements(zone);
2983 check_for_drain_complete(zone);
2986 /* Implements vdo_zone_action_fn. */
2987 static void drain_zone(void *context, zone_count_t zone_number,
2988 struct vdo_completion *parent)
2990 struct block_map *map = context;
2991 struct block_map_zone *zone = &map->zones[zone_number];
2993 vdo_start_draining(&zone->state,
2994 vdo_get_current_manager_operation(map->action_manager),
2995 parent, initiate_drain);
2998 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
2999 struct vdo_completion *parent)
3001 vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
3005 /* Implements vdo_zone_action_fn. */
3006 static void resume_block_map_zone(void *context, zone_count_t zone_number,
3007 struct vdo_completion *parent)
3009 struct block_map *map = context;
3010 struct block_map_zone *zone = &map->zones[zone_number];
3012 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3015 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3017 vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3018 NULL, resume_block_map_zone, NULL, parent);
3021 /* Allocate an expanded collection of trees, for a future growth. */
3022 int vdo_prepare_to_grow_block_map(struct block_map *map,
3023 block_count_t new_logical_blocks)
3025 if (map->next_entry_count == new_logical_blocks)
3028 if (map->next_entry_count > 0)
3029 vdo_abandon_block_map_growth(map);
3031 if (new_logical_blocks < map->entry_count) {
3032 map->next_entry_count = map->entry_count;
3036 return make_forest(map, new_logical_blocks);
3039 /* Implements vdo_action_preamble_fn */
3040 static void grow_forest(void *context, struct vdo_completion *completion)
3042 replace_forest(context);
3043 vdo_finish_completion(completion);
3046 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
3047 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3049 vdo_schedule_operation(map->action_manager,
3050 VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3051 grow_forest, NULL, NULL, parent);
3054 void vdo_abandon_block_map_growth(struct block_map *map)
3056 struct forest *forest = vdo_forget(map->next_forest);
3059 deforest(forest, forest->segments - 1);
3061 map->next_entry_count = 0;
3064 /* Release the page completion and then continue the requester. */
3065 static inline void finish_processing_page(struct vdo_completion *completion, int result)
3067 struct vdo_completion *parent = completion->parent;
3069 vdo_release_page_completion(completion);
3070 vdo_continue_completion(parent, result);
3073 static void handle_page_error(struct vdo_completion *completion)
3075 finish_processing_page(completion, completion->result);
3078 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
3079 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3080 vdo_action_fn action)
3082 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3084 if (vdo_is_state_draining(&zone->state)) {
3085 continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3089 vdo_get_page(&data_vio->page_completion, zone,
3090 data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3091 modifiable, &data_vio->vio.completion,
3092 action, handle_page_error, false);
3096 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3098 * This indicates the block map entry for the logical block is either unmapped or corrupted.
3100 static void clear_mapped_location(struct data_vio *data_vio)
3102 data_vio->mapped = (struct zoned_pbn) {
3103 .state = VDO_MAPPING_STATE_UNMAPPED,
3108 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3111 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3114 static int __must_check set_mapped_location(struct data_vio *data_vio,
3115 const struct block_map_entry *entry)
3117 /* Unpack the PBN for logging purposes even if the entry is invalid. */
3118 struct data_location mapped = vdo_unpack_block_map_entry(entry);
3120 if (vdo_is_valid_location(&mapped)) {
3123 result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3124 mapped.pbn, &data_vio->mapped.zone);
3125 if (result == VDO_SUCCESS) {
3126 data_vio->mapped.pbn = mapped.pbn;
3127 data_vio->mapped.state = mapped.state;
3132 * Return all errors not specifically known to be errors from validating the
3135 if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3140 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3141 * to VDO_BAD_MAPPING.
3143 vdo_log_error_strerror(VDO_BAD_MAPPING,
3144 "PBN %llu with state %u read from the block map was invalid",
3145 (unsigned long long) mapped.pbn, mapped.state);
3148 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3151 if (!data_vio->write)
3152 return VDO_BAD_MAPPING;
3155 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3156 * entry rather than fail the write.
3158 clear_mapped_location(data_vio);
3162 /* This callback is registered in vdo_get_mapped_block(). */
3163 static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3166 struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3167 const struct block_map_page *page;
3168 const struct block_map_entry *entry;
3169 struct data_vio *data_vio = as_data_vio(completion->parent);
3170 struct block_map_tree_slot *tree_slot;
3172 if (completion->result != VDO_SUCCESS) {
3173 finish_processing_page(completion, completion->result);
3177 result = validate_completed_page(vpc, false);
3178 if (result != VDO_SUCCESS) {
3179 finish_processing_page(completion, result);
3183 page = (const struct block_map_page *) get_page_buffer(vpc->info);
3184 tree_slot = &data_vio->tree_lock.tree_slots[0];
3185 entry = &page->entries[tree_slot->block_map_slot.slot];
3187 result = set_mapped_location(data_vio, entry);
3188 finish_processing_page(completion, result);
3191 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3192 physical_block_number_t pbn,
3193 enum block_mapping_state mapping_state,
3194 sequence_number_t *recovery_lock)
3196 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3197 struct block_map *block_map = zone->block_map;
3198 struct recovery_journal *journal = block_map->journal;
3199 sequence_number_t old_locked, new_locked;
3200 struct tree_lock *tree_lock = &data_vio->tree_lock;
3202 /* Encode the new mapping. */
3203 page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3204 vdo_pack_block_map_entry(pbn, mapping_state);
3206 /* Adjust references on the recovery journal blocks. */
3207 old_locked = *recovery_lock;
3208 new_locked = data_vio->recovery_sequence_number;
3210 if ((old_locked == 0) || (old_locked > new_locked)) {
3211 vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3212 VDO_ZONE_TYPE_LOGICAL,
3215 if (old_locked > 0) {
3216 vdo_release_recovery_journal_block_reference(journal, old_locked,
3217 VDO_ZONE_TYPE_LOGICAL,
3221 *recovery_lock = new_locked;
3225 * FIXME: explain this more
3226 * Release the transferred lock from the data_vio.
3228 vdo_release_journal_entry_lock(journal, new_locked);
3229 data_vio->recovery_sequence_number = 0;
3232 static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3234 struct data_vio *data_vio = as_data_vio(completion->parent);
3235 sequence_number_t old_lock;
3236 struct vdo_page_completion *vpc;
3237 struct page_info *info;
3240 if (completion->result != VDO_SUCCESS) {
3241 finish_processing_page(completion, completion->result);
3245 vpc = as_vdo_page_completion(completion);
3246 result = validate_completed_page(vpc, true);
3247 if (result != VDO_SUCCESS) {
3248 finish_processing_page(completion, result);
3253 old_lock = info->recovery_lock;
3254 vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3255 data_vio, data_vio->new_mapped.pbn,
3256 data_vio->new_mapped.state, &info->recovery_lock);
3257 set_info_state(info, PS_DIRTY);
3258 add_to_dirty_lists(info->cache->zone, &info->state_entry,
3259 VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3260 finish_processing_page(completion, VDO_SUCCESS);
3263 /* Read a stored block mapping into a data_vio. */
3264 void vdo_get_mapped_block(struct data_vio *data_vio)
3266 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3268 * We know that the block map page for this LBN has not been allocated, so the
3269 * block must be unmapped.
3271 clear_mapped_location(data_vio);
3272 continue_data_vio(data_vio);
3276 fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3279 /* Update a stored block mapping to reflect a data_vio's new mapping. */
3280 void vdo_put_mapped_block(struct data_vio *data_vio)
3282 fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3285 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3287 zone_count_t zone = 0;
3288 struct block_map_statistics totals;
3290 memset(&totals, 0, sizeof(struct block_map_statistics));
3291 for (zone = 0; zone < map->zone_count; zone++) {
3292 const struct block_map_statistics *stats =
3293 &(map->zones[zone].page_cache.stats);
3295 totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3296 totals.clean_pages += READ_ONCE(stats->clean_pages);
3297 totals.free_pages += READ_ONCE(stats->free_pages);
3298 totals.failed_pages += READ_ONCE(stats->failed_pages);
3299 totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3300 totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3301 totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3302 totals.read_count += READ_ONCE(stats->read_count);
3303 totals.write_count += READ_ONCE(stats->write_count);
3304 totals.failed_reads += READ_ONCE(stats->failed_reads);
3305 totals.failed_writes += READ_ONCE(stats->failed_writes);
3306 totals.reclaimed += READ_ONCE(stats->reclaimed);
3307 totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3308 totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3309 totals.discard_required += READ_ONCE(stats->discard_required);
3310 totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3311 totals.fetch_required += READ_ONCE(stats->fetch_required);
3312 totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3313 totals.pages_saved += READ_ONCE(stats->pages_saved);
3314 totals.flush_count += READ_ONCE(stats->flush_count);