Merge tag 'nfsd-4.21' of git://linux-nfs.org/~bfields/linux
[sfrench/cifs-2.6.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28
29 #include "dm-core.h"
30
31 #define SUB_JOB_SIZE    128
32 #define SPLIT_COUNT     8
33 #define MIN_JOBS        8
34 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41         struct page_list *pages;
42         unsigned nr_reserved_pages;
43         unsigned nr_free_pages;
44
45         struct dm_io_client *io_client;
46
47         wait_queue_head_t destroyq;
48
49         mempool_t job_pool;
50
51         struct workqueue_struct *kcopyd_wq;
52         struct work_struct kcopyd_work;
53
54         struct dm_kcopyd_throttle *throttle;
55
56         atomic_t nr_jobs;
57
58 /*
59  * We maintain four lists of jobs:
60  *
61  * i)   jobs waiting for pages
62  * ii)  jobs that have pages, and are waiting for the io to be issued.
63  * iii) jobs that don't need to do any IO and just run a callback
64  * iv) jobs that have completed.
65  *
66  * All four of these are protected by job_lock.
67  */
68         spinlock_t job_lock;
69         struct list_head callback_jobs;
70         struct list_head complete_jobs;
71         struct list_head io_jobs;
72         struct list_head pages_jobs;
73 };
74
75 static struct page_list zero_page_list;
76
77 static DEFINE_SPINLOCK(throttle_spinlock);
78
79 /*
80  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
81  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
82  * by 2.
83  */
84 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
85
86 /*
87  * Sleep this number of milliseconds.
88  *
89  * The value was decided experimentally.
90  * Smaller values seem to cause an increased copy rate above the limit.
91  * The reason for this is unknown but possibly due to jiffies rounding errors
92  * or read/write cache inside the disk.
93  */
94 #define SLEEP_MSEC                      100
95
96 /*
97  * Maximum number of sleep events. There is a theoretical livelock if more
98  * kcopyd clients do work simultaneously which this limit avoids.
99  */
100 #define MAX_SLEEPS                      10
101
102 static void io_job_start(struct dm_kcopyd_throttle *t)
103 {
104         unsigned throttle, now, difference;
105         int slept = 0, skew;
106
107         if (unlikely(!t))
108                 return;
109
110 try_again:
111         spin_lock_irq(&throttle_spinlock);
112
113         throttle = READ_ONCE(t->throttle);
114
115         if (likely(throttle >= 100))
116                 goto skip_limit;
117
118         now = jiffies;
119         difference = now - t->last_jiffies;
120         t->last_jiffies = now;
121         if (t->num_io_jobs)
122                 t->io_period += difference;
123         t->total_period += difference;
124
125         /*
126          * Maintain sane values if we got a temporary overflow.
127          */
128         if (unlikely(t->io_period > t->total_period))
129                 t->io_period = t->total_period;
130
131         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
132                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
133                 t->total_period >>= shift;
134                 t->io_period >>= shift;
135         }
136
137         skew = t->io_period - throttle * t->total_period / 100;
138
139         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
140                 slept++;
141                 spin_unlock_irq(&throttle_spinlock);
142                 msleep(SLEEP_MSEC);
143                 goto try_again;
144         }
145
146 skip_limit:
147         t->num_io_jobs++;
148
149         spin_unlock_irq(&throttle_spinlock);
150 }
151
152 static void io_job_finish(struct dm_kcopyd_throttle *t)
153 {
154         unsigned long flags;
155
156         if (unlikely(!t))
157                 return;
158
159         spin_lock_irqsave(&throttle_spinlock, flags);
160
161         t->num_io_jobs--;
162
163         if (likely(READ_ONCE(t->throttle) >= 100))
164                 goto skip_limit;
165
166         if (!t->num_io_jobs) {
167                 unsigned now, difference;
168
169                 now = jiffies;
170                 difference = now - t->last_jiffies;
171                 t->last_jiffies = now;
172
173                 t->io_period += difference;
174                 t->total_period += difference;
175
176                 /*
177                  * Maintain sane values if we got a temporary overflow.
178                  */
179                 if (unlikely(t->io_period > t->total_period))
180                         t->io_period = t->total_period;
181         }
182
183 skip_limit:
184         spin_unlock_irqrestore(&throttle_spinlock, flags);
185 }
186
187
188 static void wake(struct dm_kcopyd_client *kc)
189 {
190         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
191 }
192
193 /*
194  * Obtain one page for the use of kcopyd.
195  */
196 static struct page_list *alloc_pl(gfp_t gfp)
197 {
198         struct page_list *pl;
199
200         pl = kmalloc(sizeof(*pl), gfp);
201         if (!pl)
202                 return NULL;
203
204         pl->page = alloc_page(gfp);
205         if (!pl->page) {
206                 kfree(pl);
207                 return NULL;
208         }
209
210         return pl;
211 }
212
213 static void free_pl(struct page_list *pl)
214 {
215         __free_page(pl->page);
216         kfree(pl);
217 }
218
219 /*
220  * Add the provided pages to a client's free page list, releasing
221  * back to the system any beyond the reserved_pages limit.
222  */
223 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
224 {
225         struct page_list *next;
226
227         do {
228                 next = pl->next;
229
230                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
231                         free_pl(pl);
232                 else {
233                         pl->next = kc->pages;
234                         kc->pages = pl;
235                         kc->nr_free_pages++;
236                 }
237
238                 pl = next;
239         } while (pl);
240 }
241
242 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
243                             unsigned int nr, struct page_list **pages)
244 {
245         struct page_list *pl;
246
247         *pages = NULL;
248
249         do {
250                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
251                 if (unlikely(!pl)) {
252                         /* Use reserved pages */
253                         pl = kc->pages;
254                         if (unlikely(!pl))
255                                 goto out_of_memory;
256                         kc->pages = pl->next;
257                         kc->nr_free_pages--;
258                 }
259                 pl->next = *pages;
260                 *pages = pl;
261         } while (--nr);
262
263         return 0;
264
265 out_of_memory:
266         if (*pages)
267                 kcopyd_put_pages(kc, *pages);
268         return -ENOMEM;
269 }
270
271 /*
272  * These three functions resize the page pool.
273  */
274 static void drop_pages(struct page_list *pl)
275 {
276         struct page_list *next;
277
278         while (pl) {
279                 next = pl->next;
280                 free_pl(pl);
281                 pl = next;
282         }
283 }
284
285 /*
286  * Allocate and reserve nr_pages for the use of a specific client.
287  */
288 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
289 {
290         unsigned i;
291         struct page_list *pl = NULL, *next;
292
293         for (i = 0; i < nr_pages; i++) {
294                 next = alloc_pl(GFP_KERNEL);
295                 if (!next) {
296                         if (pl)
297                                 drop_pages(pl);
298                         return -ENOMEM;
299                 }
300                 next->next = pl;
301                 pl = next;
302         }
303
304         kc->nr_reserved_pages += nr_pages;
305         kcopyd_put_pages(kc, pl);
306
307         return 0;
308 }
309
310 static void client_free_pages(struct dm_kcopyd_client *kc)
311 {
312         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
313         drop_pages(kc->pages);
314         kc->pages = NULL;
315         kc->nr_free_pages = kc->nr_reserved_pages = 0;
316 }
317
318 /*-----------------------------------------------------------------
319  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
320  * for this reason we use a mempool to prevent the client from
321  * ever having to do io (which could cause a deadlock).
322  *---------------------------------------------------------------*/
323 struct kcopyd_job {
324         struct dm_kcopyd_client *kc;
325         struct list_head list;
326         unsigned long flags;
327
328         /*
329          * Error state of the job.
330          */
331         int read_err;
332         unsigned long write_err;
333
334         /*
335          * Either READ or WRITE
336          */
337         int rw;
338         struct dm_io_region source;
339
340         /*
341          * The destinations for the transfer.
342          */
343         unsigned int num_dests;
344         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
345
346         struct page_list *pages;
347
348         /*
349          * Set this to ensure you are notified when the job has
350          * completed.  'context' is for callback to use.
351          */
352         dm_kcopyd_notify_fn fn;
353         void *context;
354
355         /*
356          * These fields are only used if the job has been split
357          * into more manageable parts.
358          */
359         struct mutex lock;
360         atomic_t sub_jobs;
361         sector_t progress;
362         sector_t write_offset;
363
364         struct kcopyd_job *master_job;
365 };
366
367 static struct kmem_cache *_job_cache;
368
369 int __init dm_kcopyd_init(void)
370 {
371         _job_cache = kmem_cache_create("kcopyd_job",
372                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
373                                 __alignof__(struct kcopyd_job), 0, NULL);
374         if (!_job_cache)
375                 return -ENOMEM;
376
377         zero_page_list.next = &zero_page_list;
378         zero_page_list.page = ZERO_PAGE(0);
379
380         return 0;
381 }
382
383 void dm_kcopyd_exit(void)
384 {
385         kmem_cache_destroy(_job_cache);
386         _job_cache = NULL;
387 }
388
389 /*
390  * Functions to push and pop a job onto the head of a given job
391  * list.
392  */
393 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
394                                      struct dm_kcopyd_client *kc)
395 {
396         struct kcopyd_job *job;
397
398         /*
399          * For I/O jobs, pop any read, any write without sequential write
400          * constraint and sequential writes that are at the right position.
401          */
402         list_for_each_entry(job, jobs, list) {
403                 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
404                         list_del(&job->list);
405                         return job;
406                 }
407
408                 if (job->write_offset == job->master_job->write_offset) {
409                         job->master_job->write_offset += job->source.count;
410                         list_del(&job->list);
411                         return job;
412                 }
413         }
414
415         return NULL;
416 }
417
418 static struct kcopyd_job *pop(struct list_head *jobs,
419                               struct dm_kcopyd_client *kc)
420 {
421         struct kcopyd_job *job = NULL;
422         unsigned long flags;
423
424         spin_lock_irqsave(&kc->job_lock, flags);
425
426         if (!list_empty(jobs)) {
427                 if (jobs == &kc->io_jobs)
428                         job = pop_io_job(jobs, kc);
429                 else {
430                         job = list_entry(jobs->next, struct kcopyd_job, list);
431                         list_del(&job->list);
432                 }
433         }
434         spin_unlock_irqrestore(&kc->job_lock, flags);
435
436         return job;
437 }
438
439 static void push(struct list_head *jobs, struct kcopyd_job *job)
440 {
441         unsigned long flags;
442         struct dm_kcopyd_client *kc = job->kc;
443
444         spin_lock_irqsave(&kc->job_lock, flags);
445         list_add_tail(&job->list, jobs);
446         spin_unlock_irqrestore(&kc->job_lock, flags);
447 }
448
449
450 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
451 {
452         unsigned long flags;
453         struct dm_kcopyd_client *kc = job->kc;
454
455         spin_lock_irqsave(&kc->job_lock, flags);
456         list_add(&job->list, jobs);
457         spin_unlock_irqrestore(&kc->job_lock, flags);
458 }
459
460 /*
461  * These three functions process 1 item from the corresponding
462  * job list.
463  *
464  * They return:
465  * < 0: error
466  *   0: success
467  * > 0: can't process yet.
468  */
469 static int run_complete_job(struct kcopyd_job *job)
470 {
471         void *context = job->context;
472         int read_err = job->read_err;
473         unsigned long write_err = job->write_err;
474         dm_kcopyd_notify_fn fn = job->fn;
475         struct dm_kcopyd_client *kc = job->kc;
476
477         if (job->pages && job->pages != &zero_page_list)
478                 kcopyd_put_pages(kc, job->pages);
479         /*
480          * If this is the master job, the sub jobs have already
481          * completed so we can free everything.
482          */
483         if (job->master_job == job) {
484                 mutex_destroy(&job->lock);
485                 mempool_free(job, &kc->job_pool);
486         }
487         fn(read_err, write_err, context);
488
489         if (atomic_dec_and_test(&kc->nr_jobs))
490                 wake_up(&kc->destroyq);
491
492         cond_resched();
493
494         return 0;
495 }
496
497 static void complete_io(unsigned long error, void *context)
498 {
499         struct kcopyd_job *job = (struct kcopyd_job *) context;
500         struct dm_kcopyd_client *kc = job->kc;
501
502         io_job_finish(kc->throttle);
503
504         if (error) {
505                 if (op_is_write(job->rw))
506                         job->write_err |= error;
507                 else
508                         job->read_err = 1;
509
510                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
511                         push(&kc->complete_jobs, job);
512                         wake(kc);
513                         return;
514                 }
515         }
516
517         if (op_is_write(job->rw))
518                 push(&kc->complete_jobs, job);
519
520         else {
521                 job->rw = WRITE;
522                 push(&kc->io_jobs, job);
523         }
524
525         wake(kc);
526 }
527
528 /*
529  * Request io on as many buffer heads as we can currently get for
530  * a particular job.
531  */
532 static int run_io_job(struct kcopyd_job *job)
533 {
534         int r;
535         struct dm_io_request io_req = {
536                 .bi_op = job->rw,
537                 .bi_op_flags = 0,
538                 .mem.type = DM_IO_PAGE_LIST,
539                 .mem.ptr.pl = job->pages,
540                 .mem.offset = 0,
541                 .notify.fn = complete_io,
542                 .notify.context = job,
543                 .client = job->kc->io_client,
544         };
545
546         /*
547          * If we need to write sequentially and some reads or writes failed,
548          * no point in continuing.
549          */
550         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
551             job->master_job->write_err)
552                 return -EIO;
553
554         io_job_start(job->kc->throttle);
555
556         if (job->rw == READ)
557                 r = dm_io(&io_req, 1, &job->source, NULL);
558         else
559                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
560
561         return r;
562 }
563
564 static int run_pages_job(struct kcopyd_job *job)
565 {
566         int r;
567         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
568
569         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
570         if (!r) {
571                 /* this job is ready for io */
572                 push(&job->kc->io_jobs, job);
573                 return 0;
574         }
575
576         if (r == -ENOMEM)
577                 /* can't complete now */
578                 return 1;
579
580         return r;
581 }
582
583 /*
584  * Run through a list for as long as possible.  Returns the count
585  * of successful jobs.
586  */
587 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
588                         int (*fn) (struct kcopyd_job *))
589 {
590         struct kcopyd_job *job;
591         int r, count = 0;
592
593         while ((job = pop(jobs, kc))) {
594
595                 r = fn(job);
596
597                 if (r < 0) {
598                         /* error this rogue job */
599                         if (op_is_write(job->rw))
600                                 job->write_err = (unsigned long) -1L;
601                         else
602                                 job->read_err = 1;
603                         push(&kc->complete_jobs, job);
604                         break;
605                 }
606
607                 if (r > 0) {
608                         /*
609                          * We couldn't service this job ATM, so
610                          * push this job back onto the list.
611                          */
612                         push_head(jobs, job);
613                         break;
614                 }
615
616                 count++;
617         }
618
619         return count;
620 }
621
622 /*
623  * kcopyd does this every time it's woken up.
624  */
625 static void do_work(struct work_struct *work)
626 {
627         struct dm_kcopyd_client *kc = container_of(work,
628                                         struct dm_kcopyd_client, kcopyd_work);
629         struct blk_plug plug;
630         unsigned long flags;
631
632         /*
633          * The order that these are called is *very* important.
634          * complete jobs can free some pages for pages jobs.
635          * Pages jobs when successful will jump onto the io jobs
636          * list.  io jobs call wake when they complete and it all
637          * starts again.
638          */
639         spin_lock_irqsave(&kc->job_lock, flags);
640         list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
641         spin_unlock_irqrestore(&kc->job_lock, flags);
642
643         blk_start_plug(&plug);
644         process_jobs(&kc->complete_jobs, kc, run_complete_job);
645         process_jobs(&kc->pages_jobs, kc, run_pages_job);
646         process_jobs(&kc->io_jobs, kc, run_io_job);
647         blk_finish_plug(&plug);
648 }
649
650 /*
651  * If we are copying a small region we just dispatch a single job
652  * to do the copy, otherwise the io has to be split up into many
653  * jobs.
654  */
655 static void dispatch_job(struct kcopyd_job *job)
656 {
657         struct dm_kcopyd_client *kc = job->kc;
658         atomic_inc(&kc->nr_jobs);
659         if (unlikely(!job->source.count))
660                 push(&kc->callback_jobs, job);
661         else if (job->pages == &zero_page_list)
662                 push(&kc->io_jobs, job);
663         else
664                 push(&kc->pages_jobs, job);
665         wake(kc);
666 }
667
668 static void segment_complete(int read_err, unsigned long write_err,
669                              void *context)
670 {
671         /* FIXME: tidy this function */
672         sector_t progress = 0;
673         sector_t count = 0;
674         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
675         struct kcopyd_job *job = sub_job->master_job;
676         struct dm_kcopyd_client *kc = job->kc;
677
678         mutex_lock(&job->lock);
679
680         /* update the error */
681         if (read_err)
682                 job->read_err = 1;
683
684         if (write_err)
685                 job->write_err |= write_err;
686
687         /*
688          * Only dispatch more work if there hasn't been an error.
689          */
690         if ((!job->read_err && !job->write_err) ||
691             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
692                 /* get the next chunk of work */
693                 progress = job->progress;
694                 count = job->source.count - progress;
695                 if (count) {
696                         if (count > SUB_JOB_SIZE)
697                                 count = SUB_JOB_SIZE;
698
699                         job->progress += count;
700                 }
701         }
702         mutex_unlock(&job->lock);
703
704         if (count) {
705                 int i;
706
707                 *sub_job = *job;
708                 sub_job->write_offset = progress;
709                 sub_job->source.sector += progress;
710                 sub_job->source.count = count;
711
712                 for (i = 0; i < job->num_dests; i++) {
713                         sub_job->dests[i].sector += progress;
714                         sub_job->dests[i].count = count;
715                 }
716
717                 sub_job->fn = segment_complete;
718                 sub_job->context = sub_job;
719                 dispatch_job(sub_job);
720
721         } else if (atomic_dec_and_test(&job->sub_jobs)) {
722
723                 /*
724                  * Queue the completion callback to the kcopyd thread.
725                  *
726                  * Some callers assume that all the completions are called
727                  * from a single thread and don't race with each other.
728                  *
729                  * We must not call the callback directly here because this
730                  * code may not be executing in the thread.
731                  */
732                 push(&kc->complete_jobs, job);
733                 wake(kc);
734         }
735 }
736
737 /*
738  * Create some sub jobs to share the work between them.
739  */
740 static void split_job(struct kcopyd_job *master_job)
741 {
742         int i;
743
744         atomic_inc(&master_job->kc->nr_jobs);
745
746         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
747         for (i = 0; i < SPLIT_COUNT; i++) {
748                 master_job[i + 1].master_job = master_job;
749                 segment_complete(0, 0u, &master_job[i + 1]);
750         }
751 }
752
753 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
754                     unsigned int num_dests, struct dm_io_region *dests,
755                     unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
756 {
757         struct kcopyd_job *job;
758         int i;
759
760         /*
761          * Allocate an array of jobs consisting of one master job
762          * followed by SPLIT_COUNT sub jobs.
763          */
764         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
765         mutex_init(&job->lock);
766
767         /*
768          * set up for the read.
769          */
770         job->kc = kc;
771         job->flags = flags;
772         job->read_err = 0;
773         job->write_err = 0;
774
775         job->num_dests = num_dests;
776         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
777
778         /*
779          * If one of the destination is a host-managed zoned block device,
780          * we need to write sequentially. If one of the destination is a
781          * host-aware device, then leave it to the caller to choose what to do.
782          */
783         if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
784                 for (i = 0; i < job->num_dests; i++) {
785                         if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
786                                 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
787                                 break;
788                         }
789                 }
790         }
791
792         /*
793          * If we need to write sequentially, errors cannot be ignored.
794          */
795         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
796             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
797                 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
798
799         if (from) {
800                 job->source = *from;
801                 job->pages = NULL;
802                 job->rw = READ;
803         } else {
804                 memset(&job->source, 0, sizeof job->source);
805                 job->source.count = job->dests[0].count;
806                 job->pages = &zero_page_list;
807
808                 /*
809                  * Use WRITE ZEROES to optimize zeroing if all dests support it.
810                  */
811                 job->rw = REQ_OP_WRITE_ZEROES;
812                 for (i = 0; i < job->num_dests; i++)
813                         if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
814                                 job->rw = WRITE;
815                                 break;
816                         }
817         }
818
819         job->fn = fn;
820         job->context = context;
821         job->master_job = job;
822         job->write_offset = 0;
823
824         if (job->source.count <= SUB_JOB_SIZE)
825                 dispatch_job(job);
826         else {
827                 job->progress = 0;
828                 split_job(job);
829         }
830 }
831 EXPORT_SYMBOL(dm_kcopyd_copy);
832
833 void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
834                     unsigned num_dests, struct dm_io_region *dests,
835                     unsigned flags, dm_kcopyd_notify_fn fn, void *context)
836 {
837         dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
838 }
839 EXPORT_SYMBOL(dm_kcopyd_zero);
840
841 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
842                                  dm_kcopyd_notify_fn fn, void *context)
843 {
844         struct kcopyd_job *job;
845
846         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
847
848         memset(job, 0, sizeof(struct kcopyd_job));
849         job->kc = kc;
850         job->fn = fn;
851         job->context = context;
852         job->master_job = job;
853
854         atomic_inc(&kc->nr_jobs);
855
856         return job;
857 }
858 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
859
860 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
861 {
862         struct kcopyd_job *job = j;
863         struct dm_kcopyd_client *kc = job->kc;
864
865         job->read_err = read_err;
866         job->write_err = write_err;
867
868         push(&kc->callback_jobs, job);
869         wake(kc);
870 }
871 EXPORT_SYMBOL(dm_kcopyd_do_callback);
872
873 /*
874  * Cancels a kcopyd job, eg. someone might be deactivating a
875  * mirror.
876  */
877 #if 0
878 int kcopyd_cancel(struct kcopyd_job *job, int block)
879 {
880         /* FIXME: finish */
881         return -1;
882 }
883 #endif  /*  0  */
884
885 /*-----------------------------------------------------------------
886  * Client setup
887  *---------------------------------------------------------------*/
888 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
889 {
890         int r;
891         struct dm_kcopyd_client *kc;
892
893         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
894         if (!kc)
895                 return ERR_PTR(-ENOMEM);
896
897         spin_lock_init(&kc->job_lock);
898         INIT_LIST_HEAD(&kc->callback_jobs);
899         INIT_LIST_HEAD(&kc->complete_jobs);
900         INIT_LIST_HEAD(&kc->io_jobs);
901         INIT_LIST_HEAD(&kc->pages_jobs);
902         kc->throttle = throttle;
903
904         r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
905         if (r)
906                 goto bad_slab;
907
908         INIT_WORK(&kc->kcopyd_work, do_work);
909         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
910         if (!kc->kcopyd_wq) {
911                 r = -ENOMEM;
912                 goto bad_workqueue;
913         }
914
915         kc->pages = NULL;
916         kc->nr_reserved_pages = kc->nr_free_pages = 0;
917         r = client_reserve_pages(kc, RESERVE_PAGES);
918         if (r)
919                 goto bad_client_pages;
920
921         kc->io_client = dm_io_client_create();
922         if (IS_ERR(kc->io_client)) {
923                 r = PTR_ERR(kc->io_client);
924                 goto bad_io_client;
925         }
926
927         init_waitqueue_head(&kc->destroyq);
928         atomic_set(&kc->nr_jobs, 0);
929
930         return kc;
931
932 bad_io_client:
933         client_free_pages(kc);
934 bad_client_pages:
935         destroy_workqueue(kc->kcopyd_wq);
936 bad_workqueue:
937         mempool_exit(&kc->job_pool);
938 bad_slab:
939         kfree(kc);
940
941         return ERR_PTR(r);
942 }
943 EXPORT_SYMBOL(dm_kcopyd_client_create);
944
945 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
946 {
947         /* Wait for completion of all jobs submitted by this client. */
948         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
949
950         BUG_ON(!list_empty(&kc->callback_jobs));
951         BUG_ON(!list_empty(&kc->complete_jobs));
952         BUG_ON(!list_empty(&kc->io_jobs));
953         BUG_ON(!list_empty(&kc->pages_jobs));
954         destroy_workqueue(kc->kcopyd_wq);
955         dm_io_client_destroy(kc->io_client);
956         client_free_pages(kc);
957         mempool_exit(&kc->job_pool);
958         kfree(kc);
959 }
960 EXPORT_SYMBOL(dm_kcopyd_client_destroy);