md: Make flush bios explicitely sync
[sfrench/cifs-2.6.git] / drivers / md / raid5-ppl.c
1 /*
2  * Partial Parity Log for closing the RAID5 write hole
3  * Copyright (c) 2017, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU General Public License,
7  * version 2, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
12  * more details.
13  */
14
15 #include <linux/kernel.h>
16 #include <linux/blkdev.h>
17 #include <linux/slab.h>
18 #include <linux/crc32c.h>
19 #include <linux/flex_array.h>
20 #include <linux/async_tx.h>
21 #include <linux/raid/md_p.h>
22 #include "md.h"
23 #include "raid5.h"
24
25 /*
26  * PPL consists of a 4KB header (struct ppl_header) and at least 128KB for
27  * partial parity data. The header contains an array of entries
28  * (struct ppl_header_entry) which describe the logged write requests.
29  * Partial parity for the entries comes after the header, written in the same
30  * sequence as the entries:
31  *
32  * Header
33  *   entry0
34  *   ...
35  *   entryN
36  * PP data
37  *   PP for entry0
38  *   ...
39  *   PP for entryN
40  *
41  * An entry describes one or more consecutive stripe_heads, up to a full
42  * stripe. The modifed raid data chunks form an m-by-n matrix, where m is the
43  * number of stripe_heads in the entry and n is the number of modified data
44  * disks. Every stripe_head in the entry must write to the same data disks.
45  * An example of a valid case described by a single entry (writes to the first
46  * stripe of a 4 disk array, 16k chunk size):
47  *
48  * sh->sector   dd0   dd1   dd2    ppl
49  *            +-----+-----+-----+
50  * 0          | --- | --- | --- | +----+
51  * 8          | -W- | -W- | --- | | pp |   data_sector = 8
52  * 16         | -W- | -W- | --- | | pp |   data_size = 3 * 2 * 4k
53  * 24         | -W- | -W- | --- | | pp |   pp_size = 3 * 4k
54  *            +-----+-----+-----+ +----+
55  *
56  * data_sector is the first raid sector of the modified data, data_size is the
57  * total size of modified data and pp_size is the size of partial parity for
58  * this entry. Entries for full stripe writes contain no partial parity
59  * (pp_size = 0), they only mark the stripes for which parity should be
60  * recalculated after an unclean shutdown. Every entry holds a checksum of its
61  * partial parity, the header also has a checksum of the header itself.
62  *
63  * A write request is always logged to the PPL instance stored on the parity
64  * disk of the corresponding stripe. For each member disk there is one ppl_log
65  * used to handle logging for this disk, independently from others. They are
66  * grouped in child_logs array in struct ppl_conf, which is assigned to
67  * r5conf->log_private.
68  *
69  * ppl_io_unit represents a full PPL write, header_page contains the ppl_header.
70  * PPL entries for logged stripes are added in ppl_log_stripe(). A stripe_head
71  * can be appended to the last entry if it meets the conditions for a valid
72  * entry described above, otherwise a new entry is added. Checksums of entries
73  * are calculated incrementally as stripes containing partial parity are being
74  * added. ppl_submit_iounit() calculates the checksum of the header and submits
75  * a bio containing the header page and partial parity pages (sh->ppl_page) for
76  * all stripes of the io_unit. When the PPL write completes, the stripes
77  * associated with the io_unit are released and raid5d starts writing their data
78  * and parity. When all stripes are written, the io_unit is freed and the next
79  * can be submitted.
80  *
81  * An io_unit is used to gather stripes until it is submitted or becomes full
82  * (if the maximum number of entries or size of PPL is reached). Another io_unit
83  * can't be submitted until the previous has completed (PPL and stripe
84  * data+parity is written). The log->io_list tracks all io_units of a log
85  * (for a single member disk). New io_units are added to the end of the list
86  * and the first io_unit is submitted, if it is not submitted already.
87  * The current io_unit accepting new stripes is always at the end of the list.
88  */
89
90 struct ppl_conf {
91         struct mddev *mddev;
92
93         /* array of child logs, one for each raid disk */
94         struct ppl_log *child_logs;
95         int count;
96
97         int block_size;         /* the logical block size used for data_sector
98                                  * in ppl_header_entry */
99         u32 signature;          /* raid array identifier */
100         atomic64_t seq;         /* current log write sequence number */
101
102         struct kmem_cache *io_kc;
103         mempool_t *io_pool;
104         struct bio_set *bs;
105
106         /* used only for recovery */
107         int recovered_entries;
108         int mismatch_count;
109
110         /* stripes to retry if failed to allocate io_unit */
111         struct list_head no_mem_stripes;
112         spinlock_t no_mem_stripes_lock;
113 };
114
115 struct ppl_log {
116         struct ppl_conf *ppl_conf;      /* shared between all log instances */
117
118         struct md_rdev *rdev;           /* array member disk associated with
119                                          * this log instance */
120         struct mutex io_mutex;
121         struct ppl_io_unit *current_io; /* current io_unit accepting new data
122                                          * always at the end of io_list */
123         spinlock_t io_list_lock;
124         struct list_head io_list;       /* all io_units of this log */
125 };
126
127 #define PPL_IO_INLINE_BVECS 32
128
129 struct ppl_io_unit {
130         struct ppl_log *log;
131
132         struct page *header_page;       /* for ppl_header */
133
134         unsigned int entries_count;     /* number of entries in ppl_header */
135         unsigned int pp_size;           /* total size current of partial parity */
136
137         u64 seq;                        /* sequence number of this log write */
138         struct list_head log_sibling;   /* log->io_list */
139
140         struct list_head stripe_list;   /* stripes added to the io_unit */
141         atomic_t pending_stripes;       /* how many stripes not written to raid */
142
143         bool submitted;                 /* true if write to log started */
144
145         /* inline bio and its biovec for submitting the iounit */
146         struct bio bio;
147         struct bio_vec biovec[PPL_IO_INLINE_BVECS];
148 };
149
150 struct dma_async_tx_descriptor *
151 ops_run_partial_parity(struct stripe_head *sh, struct raid5_percpu *percpu,
152                        struct dma_async_tx_descriptor *tx)
153 {
154         int disks = sh->disks;
155         struct page **srcs = flex_array_get(percpu->scribble, 0);
156         int count = 0, pd_idx = sh->pd_idx, i;
157         struct async_submit_ctl submit;
158
159         pr_debug("%s: stripe %llu\n", __func__, (unsigned long long)sh->sector);
160
161         /*
162          * Partial parity is the XOR of stripe data chunks that are not changed
163          * during the write request. Depending on available data
164          * (read-modify-write vs. reconstruct-write case) we calculate it
165          * differently.
166          */
167         if (sh->reconstruct_state == reconstruct_state_prexor_drain_run) {
168                 /*
169                  * rmw: xor old data and parity from updated disks
170                  * This is calculated earlier by ops_run_prexor5() so just copy
171                  * the parity dev page.
172                  */
173                 srcs[count++] = sh->dev[pd_idx].page;
174         } else if (sh->reconstruct_state == reconstruct_state_drain_run) {
175                 /* rcw: xor data from all not updated disks */
176                 for (i = disks; i--;) {
177                         struct r5dev *dev = &sh->dev[i];
178                         if (test_bit(R5_UPTODATE, &dev->flags))
179                                 srcs[count++] = dev->page;
180                 }
181         } else {
182                 return tx;
183         }
184
185         init_async_submit(&submit, ASYNC_TX_FENCE|ASYNC_TX_XOR_ZERO_DST, tx,
186                           NULL, sh, flex_array_get(percpu->scribble, 0)
187                           + sizeof(struct page *) * (sh->disks + 2));
188
189         if (count == 1)
190                 tx = async_memcpy(sh->ppl_page, srcs[0], 0, 0, PAGE_SIZE,
191                                   &submit);
192         else
193                 tx = async_xor(sh->ppl_page, srcs, 0, count, PAGE_SIZE,
194                                &submit);
195
196         return tx;
197 }
198
199 static void *ppl_io_pool_alloc(gfp_t gfp_mask, void *pool_data)
200 {
201         struct kmem_cache *kc = pool_data;
202         struct ppl_io_unit *io;
203
204         io = kmem_cache_alloc(kc, gfp_mask);
205         if (!io)
206                 return NULL;
207
208         io->header_page = alloc_page(gfp_mask);
209         if (!io->header_page) {
210                 kmem_cache_free(kc, io);
211                 return NULL;
212         }
213
214         return io;
215 }
216
217 static void ppl_io_pool_free(void *element, void *pool_data)
218 {
219         struct kmem_cache *kc = pool_data;
220         struct ppl_io_unit *io = element;
221
222         __free_page(io->header_page);
223         kmem_cache_free(kc, io);
224 }
225
226 static struct ppl_io_unit *ppl_new_iounit(struct ppl_log *log,
227                                           struct stripe_head *sh)
228 {
229         struct ppl_conf *ppl_conf = log->ppl_conf;
230         struct ppl_io_unit *io;
231         struct ppl_header *pplhdr;
232         struct page *header_page;
233
234         io = mempool_alloc(ppl_conf->io_pool, GFP_NOWAIT);
235         if (!io)
236                 return NULL;
237
238         header_page = io->header_page;
239         memset(io, 0, sizeof(*io));
240         io->header_page = header_page;
241
242         io->log = log;
243         INIT_LIST_HEAD(&io->log_sibling);
244         INIT_LIST_HEAD(&io->stripe_list);
245         atomic_set(&io->pending_stripes, 0);
246         bio_init(&io->bio, io->biovec, PPL_IO_INLINE_BVECS);
247
248         pplhdr = page_address(io->header_page);
249         clear_page(pplhdr);
250         memset(pplhdr->reserved, 0xff, PPL_HDR_RESERVED);
251         pplhdr->signature = cpu_to_le32(ppl_conf->signature);
252
253         io->seq = atomic64_add_return(1, &ppl_conf->seq);
254         pplhdr->generation = cpu_to_le64(io->seq);
255
256         return io;
257 }
258
259 static int ppl_log_stripe(struct ppl_log *log, struct stripe_head *sh)
260 {
261         struct ppl_io_unit *io = log->current_io;
262         struct ppl_header_entry *e = NULL;
263         struct ppl_header *pplhdr;
264         int i;
265         sector_t data_sector = 0;
266         int data_disks = 0;
267         unsigned int entry_space = (log->rdev->ppl.size << 9) - PPL_HEADER_SIZE;
268         struct r5conf *conf = sh->raid_conf;
269
270         pr_debug("%s: stripe: %llu\n", __func__, (unsigned long long)sh->sector);
271
272         /* check if current io_unit is full */
273         if (io && (io->pp_size == entry_space ||
274                    io->entries_count == PPL_HDR_MAX_ENTRIES)) {
275                 pr_debug("%s: add io_unit blocked by seq: %llu\n",
276                          __func__, io->seq);
277                 io = NULL;
278         }
279
280         /* add a new unit if there is none or the current is full */
281         if (!io) {
282                 io = ppl_new_iounit(log, sh);
283                 if (!io)
284                         return -ENOMEM;
285                 spin_lock_irq(&log->io_list_lock);
286                 list_add_tail(&io->log_sibling, &log->io_list);
287                 spin_unlock_irq(&log->io_list_lock);
288
289                 log->current_io = io;
290         }
291
292         for (i = 0; i < sh->disks; i++) {
293                 struct r5dev *dev = &sh->dev[i];
294
295                 if (i != sh->pd_idx && test_bit(R5_Wantwrite, &dev->flags)) {
296                         if (!data_disks || dev->sector < data_sector)
297                                 data_sector = dev->sector;
298                         data_disks++;
299                 }
300         }
301         BUG_ON(!data_disks);
302
303         pr_debug("%s: seq: %llu data_sector: %llu data_disks: %d\n", __func__,
304                  io->seq, (unsigned long long)data_sector, data_disks);
305
306         pplhdr = page_address(io->header_page);
307
308         if (io->entries_count > 0) {
309                 struct ppl_header_entry *last =
310                                 &pplhdr->entries[io->entries_count - 1];
311                 struct stripe_head *sh_last = list_last_entry(
312                                 &io->stripe_list, struct stripe_head, log_list);
313                 u64 data_sector_last = le64_to_cpu(last->data_sector);
314                 u32 data_size_last = le32_to_cpu(last->data_size);
315
316                 /*
317                  * Check if we can append the stripe to the last entry. It must
318                  * be just after the last logged stripe and write to the same
319                  * disks. Use bit shift and logarithm to avoid 64-bit division.
320                  */
321                 if ((sh->sector == sh_last->sector + STRIPE_SECTORS) &&
322                     (data_sector >> ilog2(conf->chunk_sectors) ==
323                      data_sector_last >> ilog2(conf->chunk_sectors)) &&
324                     ((data_sector - data_sector_last) * data_disks ==
325                      data_size_last >> 9))
326                         e = last;
327         }
328
329         if (!e) {
330                 e = &pplhdr->entries[io->entries_count++];
331                 e->data_sector = cpu_to_le64(data_sector);
332                 e->parity_disk = cpu_to_le32(sh->pd_idx);
333                 e->checksum = cpu_to_le32(~0);
334         }
335
336         le32_add_cpu(&e->data_size, data_disks << PAGE_SHIFT);
337
338         /* don't write any PP if full stripe write */
339         if (!test_bit(STRIPE_FULL_WRITE, &sh->state)) {
340                 le32_add_cpu(&e->pp_size, PAGE_SIZE);
341                 io->pp_size += PAGE_SIZE;
342                 e->checksum = cpu_to_le32(crc32c_le(le32_to_cpu(e->checksum),
343                                                     page_address(sh->ppl_page),
344                                                     PAGE_SIZE));
345         }
346
347         list_add_tail(&sh->log_list, &io->stripe_list);
348         atomic_inc(&io->pending_stripes);
349         sh->ppl_io = io;
350
351         return 0;
352 }
353
354 int ppl_write_stripe(struct r5conf *conf, struct stripe_head *sh)
355 {
356         struct ppl_conf *ppl_conf = conf->log_private;
357         struct ppl_io_unit *io = sh->ppl_io;
358         struct ppl_log *log;
359
360         if (io || test_bit(STRIPE_SYNCING, &sh->state) || !sh->ppl_page ||
361             !test_bit(R5_Wantwrite, &sh->dev[sh->pd_idx].flags) ||
362             !test_bit(R5_Insync, &sh->dev[sh->pd_idx].flags)) {
363                 clear_bit(STRIPE_LOG_TRAPPED, &sh->state);
364                 return -EAGAIN;
365         }
366
367         log = &ppl_conf->child_logs[sh->pd_idx];
368
369         mutex_lock(&log->io_mutex);
370
371         if (!log->rdev || test_bit(Faulty, &log->rdev->flags)) {
372                 mutex_unlock(&log->io_mutex);
373                 return -EAGAIN;
374         }
375
376         set_bit(STRIPE_LOG_TRAPPED, &sh->state);
377         clear_bit(STRIPE_DELAYED, &sh->state);
378         atomic_inc(&sh->count);
379
380         if (ppl_log_stripe(log, sh)) {
381                 spin_lock_irq(&ppl_conf->no_mem_stripes_lock);
382                 list_add_tail(&sh->log_list, &ppl_conf->no_mem_stripes);
383                 spin_unlock_irq(&ppl_conf->no_mem_stripes_lock);
384         }
385
386         mutex_unlock(&log->io_mutex);
387
388         return 0;
389 }
390
391 static void ppl_log_endio(struct bio *bio)
392 {
393         struct ppl_io_unit *io = bio->bi_private;
394         struct ppl_log *log = io->log;
395         struct ppl_conf *ppl_conf = log->ppl_conf;
396         struct stripe_head *sh, *next;
397
398         pr_debug("%s: seq: %llu\n", __func__, io->seq);
399
400         if (bio->bi_error)
401                 md_error(ppl_conf->mddev, log->rdev);
402
403         list_for_each_entry_safe(sh, next, &io->stripe_list, log_list) {
404                 list_del_init(&sh->log_list);
405
406                 set_bit(STRIPE_HANDLE, &sh->state);
407                 raid5_release_stripe(sh);
408         }
409 }
410
411 static void ppl_submit_iounit_bio(struct ppl_io_unit *io, struct bio *bio)
412 {
413         char b[BDEVNAME_SIZE];
414
415         pr_debug("%s: seq: %llu size: %u sector: %llu dev: %s\n",
416                  __func__, io->seq, bio->bi_iter.bi_size,
417                  (unsigned long long)bio->bi_iter.bi_sector,
418                  bdevname(bio->bi_bdev, b));
419
420         submit_bio(bio);
421 }
422
423 static void ppl_submit_iounit(struct ppl_io_unit *io)
424 {
425         struct ppl_log *log = io->log;
426         struct ppl_conf *ppl_conf = log->ppl_conf;
427         struct ppl_header *pplhdr = page_address(io->header_page);
428         struct bio *bio = &io->bio;
429         struct stripe_head *sh;
430         int i;
431
432         bio->bi_private = io;
433
434         if (!log->rdev || test_bit(Faulty, &log->rdev->flags)) {
435                 ppl_log_endio(bio);
436                 return;
437         }
438
439         for (i = 0; i < io->entries_count; i++) {
440                 struct ppl_header_entry *e = &pplhdr->entries[i];
441
442                 pr_debug("%s: seq: %llu entry: %d data_sector: %llu pp_size: %u data_size: %u\n",
443                          __func__, io->seq, i, le64_to_cpu(e->data_sector),
444                          le32_to_cpu(e->pp_size), le32_to_cpu(e->data_size));
445
446                 e->data_sector = cpu_to_le64(le64_to_cpu(e->data_sector) >>
447                                              ilog2(ppl_conf->block_size >> 9));
448                 e->checksum = cpu_to_le32(~le32_to_cpu(e->checksum));
449         }
450
451         pplhdr->entries_count = cpu_to_le32(io->entries_count);
452         pplhdr->checksum = cpu_to_le32(~crc32c_le(~0, pplhdr, PPL_HEADER_SIZE));
453
454         bio->bi_end_io = ppl_log_endio;
455         bio->bi_opf = REQ_OP_WRITE | REQ_FUA;
456         bio->bi_bdev = log->rdev->bdev;
457         bio->bi_iter.bi_sector = log->rdev->ppl.sector;
458         bio_add_page(bio, io->header_page, PAGE_SIZE, 0);
459
460         list_for_each_entry(sh, &io->stripe_list, log_list) {
461                 /* entries for full stripe writes have no partial parity */
462                 if (test_bit(STRIPE_FULL_WRITE, &sh->state))
463                         continue;
464
465                 if (!bio_add_page(bio, sh->ppl_page, PAGE_SIZE, 0)) {
466                         struct bio *prev = bio;
467
468                         bio = bio_alloc_bioset(GFP_NOIO, BIO_MAX_PAGES,
469                                                ppl_conf->bs);
470                         bio->bi_opf = prev->bi_opf;
471                         bio->bi_bdev = prev->bi_bdev;
472                         bio->bi_iter.bi_sector = bio_end_sector(prev);
473                         bio_add_page(bio, sh->ppl_page, PAGE_SIZE, 0);
474
475                         bio_chain(bio, prev);
476                         ppl_submit_iounit_bio(io, prev);
477                 }
478         }
479
480         ppl_submit_iounit_bio(io, bio);
481 }
482
483 static void ppl_submit_current_io(struct ppl_log *log)
484 {
485         struct ppl_io_unit *io;
486
487         spin_lock_irq(&log->io_list_lock);
488
489         io = list_first_entry_or_null(&log->io_list, struct ppl_io_unit,
490                                       log_sibling);
491         if (io && io->submitted)
492                 io = NULL;
493
494         spin_unlock_irq(&log->io_list_lock);
495
496         if (io) {
497                 io->submitted = true;
498
499                 if (io == log->current_io)
500                         log->current_io = NULL;
501
502                 ppl_submit_iounit(io);
503         }
504 }
505
506 void ppl_write_stripe_run(struct r5conf *conf)
507 {
508         struct ppl_conf *ppl_conf = conf->log_private;
509         struct ppl_log *log;
510         int i;
511
512         for (i = 0; i < ppl_conf->count; i++) {
513                 log = &ppl_conf->child_logs[i];
514
515                 mutex_lock(&log->io_mutex);
516                 ppl_submit_current_io(log);
517                 mutex_unlock(&log->io_mutex);
518         }
519 }
520
521 static void ppl_io_unit_finished(struct ppl_io_unit *io)
522 {
523         struct ppl_log *log = io->log;
524         struct ppl_conf *ppl_conf = log->ppl_conf;
525         unsigned long flags;
526
527         pr_debug("%s: seq: %llu\n", __func__, io->seq);
528
529         local_irq_save(flags);
530
531         spin_lock(&log->io_list_lock);
532         list_del(&io->log_sibling);
533         spin_unlock(&log->io_list_lock);
534
535         mempool_free(io, ppl_conf->io_pool);
536
537         spin_lock(&ppl_conf->no_mem_stripes_lock);
538         if (!list_empty(&ppl_conf->no_mem_stripes)) {
539                 struct stripe_head *sh;
540
541                 sh = list_first_entry(&ppl_conf->no_mem_stripes,
542                                       struct stripe_head, log_list);
543                 list_del_init(&sh->log_list);
544                 set_bit(STRIPE_HANDLE, &sh->state);
545                 raid5_release_stripe(sh);
546         }
547         spin_unlock(&ppl_conf->no_mem_stripes_lock);
548
549         local_irq_restore(flags);
550 }
551
552 void ppl_stripe_write_finished(struct stripe_head *sh)
553 {
554         struct ppl_io_unit *io;
555
556         io = sh->ppl_io;
557         sh->ppl_io = NULL;
558
559         if (io && atomic_dec_and_test(&io->pending_stripes))
560                 ppl_io_unit_finished(io);
561 }
562
563 static void ppl_xor(int size, struct page *page1, struct page *page2)
564 {
565         struct async_submit_ctl submit;
566         struct dma_async_tx_descriptor *tx;
567         struct page *xor_srcs[] = { page1, page2 };
568
569         init_async_submit(&submit, ASYNC_TX_ACK|ASYNC_TX_XOR_DROP_DST,
570                           NULL, NULL, NULL, NULL);
571         tx = async_xor(page1, xor_srcs, 0, 2, size, &submit);
572
573         async_tx_quiesce(&tx);
574 }
575
576 /*
577  * PPL recovery strategy: xor partial parity and data from all modified data
578  * disks within a stripe and write the result as the new stripe parity. If all
579  * stripe data disks are modified (full stripe write), no partial parity is
580  * available, so just xor the data disks.
581  *
582  * Recovery of a PPL entry shall occur only if all modified data disks are
583  * available and read from all of them succeeds.
584  *
585  * A PPL entry applies to a stripe, partial parity size for an entry is at most
586  * the size of the chunk. Examples of possible cases for a single entry:
587  *
588  * case 0: single data disk write:
589  *   data0    data1    data2     ppl        parity
590  * +--------+--------+--------+           +--------------------+
591  * | ------ | ------ | ------ | +----+    | (no change)        |
592  * | ------ | -data- | ------ | | pp | -> | data1 ^ pp         |
593  * | ------ | -data- | ------ | | pp | -> | data1 ^ pp         |
594  * | ------ | ------ | ------ | +----+    | (no change)        |
595  * +--------+--------+--------+           +--------------------+
596  * pp_size = data_size
597  *
598  * case 1: more than one data disk write:
599  *   data0    data1    data2     ppl        parity
600  * +--------+--------+--------+           +--------------------+
601  * | ------ | ------ | ------ | +----+    | (no change)        |
602  * | -data- | -data- | ------ | | pp | -> | data0 ^ data1 ^ pp |
603  * | -data- | -data- | ------ | | pp | -> | data0 ^ data1 ^ pp |
604  * | ------ | ------ | ------ | +----+    | (no change)        |
605  * +--------+--------+--------+           +--------------------+
606  * pp_size = data_size / modified_data_disks
607  *
608  * case 2: write to all data disks (also full stripe write):
609  *   data0    data1    data2                parity
610  * +--------+--------+--------+           +--------------------+
611  * | ------ | ------ | ------ |           | (no change)        |
612  * | -data- | -data- | -data- | --------> | xor all data       |
613  * | ------ | ------ | ------ | --------> | (no change)        |
614  * | ------ | ------ | ------ |           | (no change)        |
615  * +--------+--------+--------+           +--------------------+
616  * pp_size = 0
617  *
618  * The following cases are possible only in other implementations. The recovery
619  * code can handle them, but they are not generated at runtime because they can
620  * be reduced to cases 0, 1 and 2:
621  *
622  * case 3:
623  *   data0    data1    data2     ppl        parity
624  * +--------+--------+--------+ +----+    +--------------------+
625  * | ------ | -data- | -data- | | pp |    | data1 ^ data2 ^ pp |
626  * | ------ | -data- | -data- | | pp | -> | data1 ^ data2 ^ pp |
627  * | -data- | -data- | -data- | | -- | -> | xor all data       |
628  * | -data- | -data- | ------ | | pp |    | data0 ^ data1 ^ pp |
629  * +--------+--------+--------+ +----+    +--------------------+
630  * pp_size = chunk_size
631  *
632  * case 4:
633  *   data0    data1    data2     ppl        parity
634  * +--------+--------+--------+ +----+    +--------------------+
635  * | ------ | -data- | ------ | | pp |    | data1 ^ pp         |
636  * | ------ | ------ | ------ | | -- | -> | (no change)        |
637  * | ------ | ------ | ------ | | -- | -> | (no change)        |
638  * | -data- | ------ | ------ | | pp |    | data0 ^ pp         |
639  * +--------+--------+--------+ +----+    +--------------------+
640  * pp_size = chunk_size
641  */
642 static int ppl_recover_entry(struct ppl_log *log, struct ppl_header_entry *e,
643                              sector_t ppl_sector)
644 {
645         struct ppl_conf *ppl_conf = log->ppl_conf;
646         struct mddev *mddev = ppl_conf->mddev;
647         struct r5conf *conf = mddev->private;
648         int block_size = ppl_conf->block_size;
649         struct page *page1;
650         struct page *page2;
651         sector_t r_sector_first;
652         sector_t r_sector_last;
653         int strip_sectors;
654         int data_disks;
655         int i;
656         int ret = 0;
657         char b[BDEVNAME_SIZE];
658         unsigned int pp_size = le32_to_cpu(e->pp_size);
659         unsigned int data_size = le32_to_cpu(e->data_size);
660
661         page1 = alloc_page(GFP_KERNEL);
662         page2 = alloc_page(GFP_KERNEL);
663
664         if (!page1 || !page2) {
665                 ret = -ENOMEM;
666                 goto out;
667         }
668
669         r_sector_first = le64_to_cpu(e->data_sector) * (block_size >> 9);
670
671         if ((pp_size >> 9) < conf->chunk_sectors) {
672                 if (pp_size > 0) {
673                         data_disks = data_size / pp_size;
674                         strip_sectors = pp_size >> 9;
675                 } else {
676                         data_disks = conf->raid_disks - conf->max_degraded;
677                         strip_sectors = (data_size >> 9) / data_disks;
678                 }
679                 r_sector_last = r_sector_first +
680                                 (data_disks - 1) * conf->chunk_sectors +
681                                 strip_sectors;
682         } else {
683                 data_disks = conf->raid_disks - conf->max_degraded;
684                 strip_sectors = conf->chunk_sectors;
685                 r_sector_last = r_sector_first + (data_size >> 9);
686         }
687
688         pr_debug("%s: array sector first: %llu last: %llu\n", __func__,
689                  (unsigned long long)r_sector_first,
690                  (unsigned long long)r_sector_last);
691
692         /* if start and end is 4k aligned, use a 4k block */
693         if (block_size == 512 &&
694             (r_sector_first & (STRIPE_SECTORS - 1)) == 0 &&
695             (r_sector_last & (STRIPE_SECTORS - 1)) == 0)
696                 block_size = STRIPE_SIZE;
697
698         /* iterate through blocks in strip */
699         for (i = 0; i < strip_sectors; i += (block_size >> 9)) {
700                 bool update_parity = false;
701                 sector_t parity_sector;
702                 struct md_rdev *parity_rdev;
703                 struct stripe_head sh;
704                 int disk;
705                 int indent = 0;
706
707                 pr_debug("%s:%*s iter %d start\n", __func__, indent, "", i);
708                 indent += 2;
709
710                 memset(page_address(page1), 0, PAGE_SIZE);
711
712                 /* iterate through data member disks */
713                 for (disk = 0; disk < data_disks; disk++) {
714                         int dd_idx;
715                         struct md_rdev *rdev;
716                         sector_t sector;
717                         sector_t r_sector = r_sector_first + i +
718                                             (disk * conf->chunk_sectors);
719
720                         pr_debug("%s:%*s data member disk %d start\n",
721                                  __func__, indent, "", disk);
722                         indent += 2;
723
724                         if (r_sector >= r_sector_last) {
725                                 pr_debug("%s:%*s array sector %llu doesn't need parity update\n",
726                                          __func__, indent, "",
727                                          (unsigned long long)r_sector);
728                                 indent -= 2;
729                                 continue;
730                         }
731
732                         update_parity = true;
733
734                         /* map raid sector to member disk */
735                         sector = raid5_compute_sector(conf, r_sector, 0,
736                                                       &dd_idx, NULL);
737                         pr_debug("%s:%*s processing array sector %llu => data member disk %d, sector %llu\n",
738                                  __func__, indent, "",
739                                  (unsigned long long)r_sector, dd_idx,
740                                  (unsigned long long)sector);
741
742                         rdev = conf->disks[dd_idx].rdev;
743                         if (!rdev) {
744                                 pr_debug("%s:%*s data member disk %d missing\n",
745                                          __func__, indent, "", dd_idx);
746                                 update_parity = false;
747                                 break;
748                         }
749
750                         pr_debug("%s:%*s reading data member disk %s sector %llu\n",
751                                  __func__, indent, "", bdevname(rdev->bdev, b),
752                                  (unsigned long long)sector);
753                         if (!sync_page_io(rdev, sector, block_size, page2,
754                                         REQ_OP_READ, 0, false)) {
755                                 md_error(mddev, rdev);
756                                 pr_debug("%s:%*s read failed!\n", __func__,
757                                          indent, "");
758                                 ret = -EIO;
759                                 goto out;
760                         }
761
762                         ppl_xor(block_size, page1, page2);
763
764                         indent -= 2;
765                 }
766
767                 if (!update_parity)
768                         continue;
769
770                 if (pp_size > 0) {
771                         pr_debug("%s:%*s reading pp disk sector %llu\n",
772                                  __func__, indent, "",
773                                  (unsigned long long)(ppl_sector + i));
774                         if (!sync_page_io(log->rdev,
775                                         ppl_sector - log->rdev->data_offset + i,
776                                         block_size, page2, REQ_OP_READ, 0,
777                                         false)) {
778                                 pr_debug("%s:%*s read failed!\n", __func__,
779                                          indent, "");
780                                 md_error(mddev, log->rdev);
781                                 ret = -EIO;
782                                 goto out;
783                         }
784
785                         ppl_xor(block_size, page1, page2);
786                 }
787
788                 /* map raid sector to parity disk */
789                 parity_sector = raid5_compute_sector(conf, r_sector_first + i,
790                                 0, &disk, &sh);
791                 BUG_ON(sh.pd_idx != le32_to_cpu(e->parity_disk));
792                 parity_rdev = conf->disks[sh.pd_idx].rdev;
793
794                 BUG_ON(parity_rdev->bdev->bd_dev != log->rdev->bdev->bd_dev);
795                 pr_debug("%s:%*s write parity at sector %llu, disk %s\n",
796                          __func__, indent, "",
797                          (unsigned long long)parity_sector,
798                          bdevname(parity_rdev->bdev, b));
799                 if (!sync_page_io(parity_rdev, parity_sector, block_size,
800                                 page1, REQ_OP_WRITE, 0, false)) {
801                         pr_debug("%s:%*s parity write error!\n", __func__,
802                                  indent, "");
803                         md_error(mddev, parity_rdev);
804                         ret = -EIO;
805                         goto out;
806                 }
807         }
808 out:
809         if (page1)
810                 __free_page(page1);
811         if (page2)
812                 __free_page(page2);
813         return ret;
814 }
815
816 static int ppl_recover(struct ppl_log *log, struct ppl_header *pplhdr)
817 {
818         struct ppl_conf *ppl_conf = log->ppl_conf;
819         struct md_rdev *rdev = log->rdev;
820         struct mddev *mddev = rdev->mddev;
821         sector_t ppl_sector = rdev->ppl.sector + (PPL_HEADER_SIZE >> 9);
822         struct page *page;
823         int i;
824         int ret = 0;
825
826         page = alloc_page(GFP_KERNEL);
827         if (!page)
828                 return -ENOMEM;
829
830         /* iterate through all PPL entries saved */
831         for (i = 0; i < le32_to_cpu(pplhdr->entries_count); i++) {
832                 struct ppl_header_entry *e = &pplhdr->entries[i];
833                 u32 pp_size = le32_to_cpu(e->pp_size);
834                 sector_t sector = ppl_sector;
835                 int ppl_entry_sectors = pp_size >> 9;
836                 u32 crc, crc_stored;
837
838                 pr_debug("%s: disk: %d entry: %d ppl_sector: %llu pp_size: %u\n",
839                          __func__, rdev->raid_disk, i,
840                          (unsigned long long)ppl_sector, pp_size);
841
842                 crc = ~0;
843                 crc_stored = le32_to_cpu(e->checksum);
844
845                 /* read parial parity for this entry and calculate its checksum */
846                 while (pp_size) {
847                         int s = pp_size > PAGE_SIZE ? PAGE_SIZE : pp_size;
848
849                         if (!sync_page_io(rdev, sector - rdev->data_offset,
850                                         s, page, REQ_OP_READ, 0, false)) {
851                                 md_error(mddev, rdev);
852                                 ret = -EIO;
853                                 goto out;
854                         }
855
856                         crc = crc32c_le(crc, page_address(page), s);
857
858                         pp_size -= s;
859                         sector += s >> 9;
860                 }
861
862                 crc = ~crc;
863
864                 if (crc != crc_stored) {
865                         /*
866                          * Don't recover this entry if the checksum does not
867                          * match, but keep going and try to recover other
868                          * entries.
869                          */
870                         pr_debug("%s: ppl entry crc does not match: stored: 0x%x calculated: 0x%x\n",
871                                  __func__, crc_stored, crc);
872                         ppl_conf->mismatch_count++;
873                 } else {
874                         ret = ppl_recover_entry(log, e, ppl_sector);
875                         if (ret)
876                                 goto out;
877                         ppl_conf->recovered_entries++;
878                 }
879
880                 ppl_sector += ppl_entry_sectors;
881         }
882
883         /* flush the disk cache after recovery if necessary */
884         ret = blkdev_issue_flush(rdev->bdev, GFP_KERNEL, NULL);
885 out:
886         __free_page(page);
887         return ret;
888 }
889
890 static int ppl_write_empty_header(struct ppl_log *log)
891 {
892         struct page *page;
893         struct ppl_header *pplhdr;
894         struct md_rdev *rdev = log->rdev;
895         int ret = 0;
896
897         pr_debug("%s: disk: %d ppl_sector: %llu\n", __func__,
898                  rdev->raid_disk, (unsigned long long)rdev->ppl.sector);
899
900         page = alloc_page(GFP_NOIO | __GFP_ZERO);
901         if (!page)
902                 return -ENOMEM;
903
904         pplhdr = page_address(page);
905         memset(pplhdr->reserved, 0xff, PPL_HDR_RESERVED);
906         pplhdr->signature = cpu_to_le32(log->ppl_conf->signature);
907         pplhdr->checksum = cpu_to_le32(~crc32c_le(~0, pplhdr, PAGE_SIZE));
908
909         if (!sync_page_io(rdev, rdev->ppl.sector - rdev->data_offset,
910                           PPL_HEADER_SIZE, page, REQ_OP_WRITE | REQ_SYNC |
911                           REQ_FUA, 0, false)) {
912                 md_error(rdev->mddev, rdev);
913                 ret = -EIO;
914         }
915
916         __free_page(page);
917         return ret;
918 }
919
920 static int ppl_load_distributed(struct ppl_log *log)
921 {
922         struct ppl_conf *ppl_conf = log->ppl_conf;
923         struct md_rdev *rdev = log->rdev;
924         struct mddev *mddev = rdev->mddev;
925         struct page *page;
926         struct ppl_header *pplhdr;
927         u32 crc, crc_stored;
928         u32 signature;
929         int ret = 0;
930
931         pr_debug("%s: disk: %d\n", __func__, rdev->raid_disk);
932
933         /* read PPL header */
934         page = alloc_page(GFP_KERNEL);
935         if (!page)
936                 return -ENOMEM;
937
938         if (!sync_page_io(rdev, rdev->ppl.sector - rdev->data_offset,
939                           PAGE_SIZE, page, REQ_OP_READ, 0, false)) {
940                 md_error(mddev, rdev);
941                 ret = -EIO;
942                 goto out;
943         }
944         pplhdr = page_address(page);
945
946         /* check header validity */
947         crc_stored = le32_to_cpu(pplhdr->checksum);
948         pplhdr->checksum = 0;
949         crc = ~crc32c_le(~0, pplhdr, PAGE_SIZE);
950
951         if (crc_stored != crc) {
952                 pr_debug("%s: ppl header crc does not match: stored: 0x%x calculated: 0x%x\n",
953                          __func__, crc_stored, crc);
954                 ppl_conf->mismatch_count++;
955                 goto out;
956         }
957
958         signature = le32_to_cpu(pplhdr->signature);
959
960         if (mddev->external) {
961                 /*
962                  * For external metadata the header signature is set and
963                  * validated in userspace.
964                  */
965                 ppl_conf->signature = signature;
966         } else if (ppl_conf->signature != signature) {
967                 pr_debug("%s: ppl header signature does not match: stored: 0x%x configured: 0x%x\n",
968                          __func__, signature, ppl_conf->signature);
969                 ppl_conf->mismatch_count++;
970                 goto out;
971         }
972
973         /* attempt to recover from log if we are starting a dirty array */
974         if (!mddev->pers && mddev->recovery_cp != MaxSector)
975                 ret = ppl_recover(log, pplhdr);
976 out:
977         /* write empty header if we are starting the array */
978         if (!ret && !mddev->pers)
979                 ret = ppl_write_empty_header(log);
980
981         __free_page(page);
982
983         pr_debug("%s: return: %d mismatch_count: %d recovered_entries: %d\n",
984                  __func__, ret, ppl_conf->mismatch_count,
985                  ppl_conf->recovered_entries);
986         return ret;
987 }
988
989 static int ppl_load(struct ppl_conf *ppl_conf)
990 {
991         int ret = 0;
992         u32 signature = 0;
993         bool signature_set = false;
994         int i;
995
996         for (i = 0; i < ppl_conf->count; i++) {
997                 struct ppl_log *log = &ppl_conf->child_logs[i];
998
999                 /* skip missing drive */
1000                 if (!log->rdev)
1001                         continue;
1002
1003                 ret = ppl_load_distributed(log);
1004                 if (ret)
1005                         break;
1006
1007                 /*
1008                  * For external metadata we can't check if the signature is
1009                  * correct on a single drive, but we can check if it is the same
1010                  * on all drives.
1011                  */
1012                 if (ppl_conf->mddev->external) {
1013                         if (!signature_set) {
1014                                 signature = ppl_conf->signature;
1015                                 signature_set = true;
1016                         } else if (signature != ppl_conf->signature) {
1017                                 pr_warn("md/raid:%s: PPL header signature does not match on all member drives\n",
1018                                         mdname(ppl_conf->mddev));
1019                                 ret = -EINVAL;
1020                                 break;
1021                         }
1022                 }
1023         }
1024
1025         pr_debug("%s: return: %d mismatch_count: %d recovered_entries: %d\n",
1026                  __func__, ret, ppl_conf->mismatch_count,
1027                  ppl_conf->recovered_entries);
1028         return ret;
1029 }
1030
1031 static void __ppl_exit_log(struct ppl_conf *ppl_conf)
1032 {
1033         clear_bit(MD_HAS_PPL, &ppl_conf->mddev->flags);
1034
1035         kfree(ppl_conf->child_logs);
1036
1037         if (ppl_conf->bs)
1038                 bioset_free(ppl_conf->bs);
1039         mempool_destroy(ppl_conf->io_pool);
1040         kmem_cache_destroy(ppl_conf->io_kc);
1041
1042         kfree(ppl_conf);
1043 }
1044
1045 void ppl_exit_log(struct r5conf *conf)
1046 {
1047         struct ppl_conf *ppl_conf = conf->log_private;
1048
1049         if (ppl_conf) {
1050                 __ppl_exit_log(ppl_conf);
1051                 conf->log_private = NULL;
1052         }
1053 }
1054
1055 static int ppl_validate_rdev(struct md_rdev *rdev)
1056 {
1057         char b[BDEVNAME_SIZE];
1058         int ppl_data_sectors;
1059         int ppl_size_new;
1060
1061         /*
1062          * The configured PPL size must be enough to store
1063          * the header and (at the very least) partial parity
1064          * for one stripe. Round it down to ensure the data
1065          * space is cleanly divisible by stripe size.
1066          */
1067         ppl_data_sectors = rdev->ppl.size - (PPL_HEADER_SIZE >> 9);
1068
1069         if (ppl_data_sectors > 0)
1070                 ppl_data_sectors = rounddown(ppl_data_sectors, STRIPE_SECTORS);
1071
1072         if (ppl_data_sectors <= 0) {
1073                 pr_warn("md/raid:%s: PPL space too small on %s\n",
1074                         mdname(rdev->mddev), bdevname(rdev->bdev, b));
1075                 return -ENOSPC;
1076         }
1077
1078         ppl_size_new = ppl_data_sectors + (PPL_HEADER_SIZE >> 9);
1079
1080         if ((rdev->ppl.sector < rdev->data_offset &&
1081              rdev->ppl.sector + ppl_size_new > rdev->data_offset) ||
1082             (rdev->ppl.sector >= rdev->data_offset &&
1083              rdev->data_offset + rdev->sectors > rdev->ppl.sector)) {
1084                 pr_warn("md/raid:%s: PPL space overlaps with data on %s\n",
1085                         mdname(rdev->mddev), bdevname(rdev->bdev, b));
1086                 return -EINVAL;
1087         }
1088
1089         if (!rdev->mddev->external &&
1090             ((rdev->ppl.offset > 0 && rdev->ppl.offset < (rdev->sb_size >> 9)) ||
1091              (rdev->ppl.offset <= 0 && rdev->ppl.offset + ppl_size_new > 0))) {
1092                 pr_warn("md/raid:%s: PPL space overlaps with superblock on %s\n",
1093                         mdname(rdev->mddev), bdevname(rdev->bdev, b));
1094                 return -EINVAL;
1095         }
1096
1097         rdev->ppl.size = ppl_size_new;
1098
1099         return 0;
1100 }
1101
1102 int ppl_init_log(struct r5conf *conf)
1103 {
1104         struct ppl_conf *ppl_conf;
1105         struct mddev *mddev = conf->mddev;
1106         int ret = 0;
1107         int i;
1108         bool need_cache_flush = false;
1109
1110         pr_debug("md/raid:%s: enabling distributed Partial Parity Log\n",
1111                  mdname(conf->mddev));
1112
1113         if (PAGE_SIZE != 4096)
1114                 return -EINVAL;
1115
1116         if (mddev->level != 5) {
1117                 pr_warn("md/raid:%s PPL is not compatible with raid level %d\n",
1118                         mdname(mddev), mddev->level);
1119                 return -EINVAL;
1120         }
1121
1122         if (mddev->bitmap_info.file || mddev->bitmap_info.offset) {
1123                 pr_warn("md/raid:%s PPL is not compatible with bitmap\n",
1124                         mdname(mddev));
1125                 return -EINVAL;
1126         }
1127
1128         if (test_bit(MD_HAS_JOURNAL, &mddev->flags)) {
1129                 pr_warn("md/raid:%s PPL is not compatible with journal\n",
1130                         mdname(mddev));
1131                 return -EINVAL;
1132         }
1133
1134         ppl_conf = kzalloc(sizeof(struct ppl_conf), GFP_KERNEL);
1135         if (!ppl_conf)
1136                 return -ENOMEM;
1137
1138         ppl_conf->mddev = mddev;
1139
1140         ppl_conf->io_kc = KMEM_CACHE(ppl_io_unit, 0);
1141         if (!ppl_conf->io_kc) {
1142                 ret = -ENOMEM;
1143                 goto err;
1144         }
1145
1146         ppl_conf->io_pool = mempool_create(conf->raid_disks, ppl_io_pool_alloc,
1147                                            ppl_io_pool_free, ppl_conf->io_kc);
1148         if (!ppl_conf->io_pool) {
1149                 ret = -ENOMEM;
1150                 goto err;
1151         }
1152
1153         ppl_conf->bs = bioset_create(conf->raid_disks, 0);
1154         if (!ppl_conf->bs) {
1155                 ret = -ENOMEM;
1156                 goto err;
1157         }
1158
1159         ppl_conf->count = conf->raid_disks;
1160         ppl_conf->child_logs = kcalloc(ppl_conf->count, sizeof(struct ppl_log),
1161                                        GFP_KERNEL);
1162         if (!ppl_conf->child_logs) {
1163                 ret = -ENOMEM;
1164                 goto err;
1165         }
1166
1167         atomic64_set(&ppl_conf->seq, 0);
1168         INIT_LIST_HEAD(&ppl_conf->no_mem_stripes);
1169         spin_lock_init(&ppl_conf->no_mem_stripes_lock);
1170
1171         if (!mddev->external) {
1172                 ppl_conf->signature = ~crc32c_le(~0, mddev->uuid, sizeof(mddev->uuid));
1173                 ppl_conf->block_size = 512;
1174         } else {
1175                 ppl_conf->block_size = queue_logical_block_size(mddev->queue);
1176         }
1177
1178         for (i = 0; i < ppl_conf->count; i++) {
1179                 struct ppl_log *log = &ppl_conf->child_logs[i];
1180                 struct md_rdev *rdev = conf->disks[i].rdev;
1181
1182                 mutex_init(&log->io_mutex);
1183                 spin_lock_init(&log->io_list_lock);
1184                 INIT_LIST_HEAD(&log->io_list);
1185
1186                 log->ppl_conf = ppl_conf;
1187                 log->rdev = rdev;
1188
1189                 if (rdev) {
1190                         struct request_queue *q;
1191
1192                         ret = ppl_validate_rdev(rdev);
1193                         if (ret)
1194                                 goto err;
1195
1196                         q = bdev_get_queue(rdev->bdev);
1197                         if (test_bit(QUEUE_FLAG_WC, &q->queue_flags))
1198                                 need_cache_flush = true;
1199                 }
1200         }
1201
1202         if (need_cache_flush)
1203                 pr_warn("md/raid:%s: Volatile write-back cache should be disabled on all member drives when using PPL!\n",
1204                         mdname(mddev));
1205
1206         /* load and possibly recover the logs from the member disks */
1207         ret = ppl_load(ppl_conf);
1208
1209         if (ret) {
1210                 goto err;
1211         } else if (!mddev->pers &&
1212                    mddev->recovery_cp == 0 && !mddev->degraded &&
1213                    ppl_conf->recovered_entries > 0 &&
1214                    ppl_conf->mismatch_count == 0) {
1215                 /*
1216                  * If we are starting a dirty array and the recovery succeeds
1217                  * without any issues, set the array as clean.
1218                  */
1219                 mddev->recovery_cp = MaxSector;
1220                 set_bit(MD_SB_CHANGE_CLEAN, &mddev->sb_flags);
1221         } else if (mddev->pers && ppl_conf->mismatch_count > 0) {
1222                 /* no mismatch allowed when enabling PPL for a running array */
1223                 ret = -EINVAL;
1224                 goto err;
1225         }
1226
1227         conf->log_private = ppl_conf;
1228         set_bit(MD_HAS_PPL, &ppl_conf->mddev->flags);
1229
1230         return 0;
1231 err:
1232         __ppl_exit_log(ppl_conf);
1233         return ret;
1234 }
1235
1236 int ppl_modify_log(struct r5conf *conf, struct md_rdev *rdev, bool add)
1237 {
1238         struct ppl_conf *ppl_conf = conf->log_private;
1239         struct ppl_log *log;
1240         int ret = 0;
1241         char b[BDEVNAME_SIZE];
1242
1243         if (!rdev)
1244                 return -EINVAL;
1245
1246         pr_debug("%s: disk: %d operation: %s dev: %s\n",
1247                  __func__, rdev->raid_disk, add ? "add" : "remove",
1248                  bdevname(rdev->bdev, b));
1249
1250         if (rdev->raid_disk < 0)
1251                 return 0;
1252
1253         if (rdev->raid_disk >= ppl_conf->count)
1254                 return -ENODEV;
1255
1256         log = &ppl_conf->child_logs[rdev->raid_disk];
1257
1258         mutex_lock(&log->io_mutex);
1259         if (add) {
1260                 ret = ppl_validate_rdev(rdev);
1261                 if (!ret) {
1262                         log->rdev = rdev;
1263                         ret = ppl_write_empty_header(log);
1264                 }
1265         } else {
1266                 log->rdev = NULL;
1267         }
1268         mutex_unlock(&log->io_mutex);
1269
1270         return ret;
1271 }