Merge remote-tracking branches 'asoc/fix/ak4613', 'asoc/fix/atmel', 'asoc/fix/compres...
[sfrench/cifs-2.6.git] / drivers / md / md-cluster.c
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10
11
12 #include <linux/module.h>
13 #include <linux/kthread.h>
14 #include <linux/dlm.h>
15 #include <linux/sched.h>
16 #include <linux/raid/md_p.h>
17 #include "md.h"
18 #include "bitmap.h"
19 #include "md-cluster.h"
20
21 #define LVB_SIZE        64
22 #define NEW_DEV_TIMEOUT 5000
23
24 struct dlm_lock_resource {
25         dlm_lockspace_t *ls;
26         struct dlm_lksb lksb;
27         char *name; /* lock name. */
28         uint32_t flags; /* flags to pass to dlm_lock() */
29         wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
30         bool sync_locking_done;
31         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
32         struct mddev *mddev; /* pointing back to mddev. */
33         int mode;
34 };
35
36 struct suspend_info {
37         int slot;
38         sector_t lo;
39         sector_t hi;
40         struct list_head list;
41 };
42
43 struct resync_info {
44         __le64 lo;
45         __le64 hi;
46 };
47
48 /* md_cluster_info flags */
49 #define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
50 #define         MD_CLUSTER_SUSPEND_READ_BALANCING       2
51 #define         MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
52
53 /* Lock the send communication. This is done through
54  * bit manipulation as opposed to a mutex in order to
55  * accomodate lock and hold. See next comment.
56  */
57 #define         MD_CLUSTER_SEND_LOCK                    4
58 /* If cluster operations (such as adding a disk) must lock the
59  * communication channel, so as to perform extra operations
60  * (update metadata) and no other operation is allowed on the
61  * MD. Token needs to be locked and held until the operation
62  * completes witha md_update_sb(), which would eventually release
63  * the lock.
64  */
65 #define         MD_CLUSTER_SEND_LOCKED_ALREADY          5
66 /* We should receive message after node joined cluster and
67  * set up all the related infos such as bitmap and personality */
68 #define         MD_CLUSTER_ALREADY_IN_CLUSTER           6
69 #define         MD_CLUSTER_PENDING_RECV_EVENT           7
70 #define         MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD              8
71
72 struct md_cluster_info {
73         struct mddev *mddev; /* the md device which md_cluster_info belongs to */
74         /* dlm lock space and resources for clustered raid. */
75         dlm_lockspace_t *lockspace;
76         int slot_number;
77         struct completion completion;
78         struct mutex recv_mutex;
79         struct dlm_lock_resource *bitmap_lockres;
80         struct dlm_lock_resource **other_bitmap_lockres;
81         struct dlm_lock_resource *resync_lockres;
82         struct list_head suspend_list;
83         spinlock_t suspend_lock;
84         struct md_thread *recovery_thread;
85         unsigned long recovery_map;
86         /* communication loc resources */
87         struct dlm_lock_resource *ack_lockres;
88         struct dlm_lock_resource *message_lockres;
89         struct dlm_lock_resource *token_lockres;
90         struct dlm_lock_resource *no_new_dev_lockres;
91         struct md_thread *recv_thread;
92         struct completion newdisk_completion;
93         wait_queue_head_t wait;
94         unsigned long state;
95         /* record the region in RESYNCING message */
96         sector_t sync_low;
97         sector_t sync_hi;
98 };
99
100 enum msg_type {
101         METADATA_UPDATED = 0,
102         RESYNCING,
103         NEWDISK,
104         REMOVE,
105         RE_ADD,
106         BITMAP_NEEDS_SYNC,
107         CHANGE_CAPACITY,
108 };
109
110 struct cluster_msg {
111         __le32 type;
112         __le32 slot;
113         /* TODO: Unionize this for smaller footprint */
114         __le64 low;
115         __le64 high;
116         char uuid[16];
117         __le32 raid_slot;
118 };
119
120 static void sync_ast(void *arg)
121 {
122         struct dlm_lock_resource *res;
123
124         res = arg;
125         res->sync_locking_done = true;
126         wake_up(&res->sync_locking);
127 }
128
129 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
130 {
131         int ret = 0;
132
133         ret = dlm_lock(res->ls, mode, &res->lksb,
134                         res->flags, res->name, strlen(res->name),
135                         0, sync_ast, res, res->bast);
136         if (ret)
137                 return ret;
138         wait_event(res->sync_locking, res->sync_locking_done);
139         res->sync_locking_done = false;
140         if (res->lksb.sb_status == 0)
141                 res->mode = mode;
142         return res->lksb.sb_status;
143 }
144
145 static int dlm_unlock_sync(struct dlm_lock_resource *res)
146 {
147         return dlm_lock_sync(res, DLM_LOCK_NL);
148 }
149
150 /*
151  * An variation of dlm_lock_sync, which make lock request could
152  * be interrupted
153  */
154 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
155                                        struct mddev *mddev)
156 {
157         int ret = 0;
158
159         ret = dlm_lock(res->ls, mode, &res->lksb,
160                         res->flags, res->name, strlen(res->name),
161                         0, sync_ast, res, res->bast);
162         if (ret)
163                 return ret;
164
165         wait_event(res->sync_locking, res->sync_locking_done
166                                       || kthread_should_stop()
167                                       || test_bit(MD_CLOSING, &mddev->flags));
168         if (!res->sync_locking_done) {
169                 /*
170                  * the convert queue contains the lock request when request is
171                  * interrupted, and sync_ast could still be run, so need to
172                  * cancel the request and reset completion
173                  */
174                 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
175                         &res->lksb, res);
176                 res->sync_locking_done = false;
177                 if (unlikely(ret != 0))
178                         pr_info("failed to cancel previous lock request "
179                                  "%s return %d\n", res->name, ret);
180                 return -EPERM;
181         } else
182                 res->sync_locking_done = false;
183         if (res->lksb.sb_status == 0)
184                 res->mode = mode;
185         return res->lksb.sb_status;
186 }
187
188 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
189                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
190 {
191         struct dlm_lock_resource *res = NULL;
192         int ret, namelen;
193         struct md_cluster_info *cinfo = mddev->cluster_info;
194
195         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
196         if (!res)
197                 return NULL;
198         init_waitqueue_head(&res->sync_locking);
199         res->sync_locking_done = false;
200         res->ls = cinfo->lockspace;
201         res->mddev = mddev;
202         res->mode = DLM_LOCK_IV;
203         namelen = strlen(name);
204         res->name = kzalloc(namelen + 1, GFP_KERNEL);
205         if (!res->name) {
206                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
207                 goto out_err;
208         }
209         strlcpy(res->name, name, namelen + 1);
210         if (with_lvb) {
211                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
212                 if (!res->lksb.sb_lvbptr) {
213                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
214                         goto out_err;
215                 }
216                 res->flags = DLM_LKF_VALBLK;
217         }
218
219         if (bastfn)
220                 res->bast = bastfn;
221
222         res->flags |= DLM_LKF_EXPEDITE;
223
224         ret = dlm_lock_sync(res, DLM_LOCK_NL);
225         if (ret) {
226                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
227                 goto out_err;
228         }
229         res->flags &= ~DLM_LKF_EXPEDITE;
230         res->flags |= DLM_LKF_CONVERT;
231
232         return res;
233 out_err:
234         kfree(res->lksb.sb_lvbptr);
235         kfree(res->name);
236         kfree(res);
237         return NULL;
238 }
239
240 static void lockres_free(struct dlm_lock_resource *res)
241 {
242         int ret = 0;
243
244         if (!res)
245                 return;
246
247         /*
248          * use FORCEUNLOCK flag, so we can unlock even the lock is on the
249          * waiting or convert queue
250          */
251         ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
252                 &res->lksb, res);
253         if (unlikely(ret != 0))
254                 pr_err("failed to unlock %s return %d\n", res->name, ret);
255         else
256                 wait_event(res->sync_locking, res->sync_locking_done);
257
258         kfree(res->name);
259         kfree(res->lksb.sb_lvbptr);
260         kfree(res);
261 }
262
263 static void add_resync_info(struct dlm_lock_resource *lockres,
264                             sector_t lo, sector_t hi)
265 {
266         struct resync_info *ri;
267
268         ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
269         ri->lo = cpu_to_le64(lo);
270         ri->hi = cpu_to_le64(hi);
271 }
272
273 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
274 {
275         struct resync_info ri;
276         struct suspend_info *s = NULL;
277         sector_t hi = 0;
278
279         dlm_lock_sync(lockres, DLM_LOCK_CR);
280         memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
281         hi = le64_to_cpu(ri.hi);
282         if (hi > 0) {
283                 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
284                 if (!s)
285                         goto out;
286                 s->hi = hi;
287                 s->lo = le64_to_cpu(ri.lo);
288         }
289         dlm_unlock_sync(lockres);
290 out:
291         return s;
292 }
293
294 static void recover_bitmaps(struct md_thread *thread)
295 {
296         struct mddev *mddev = thread->mddev;
297         struct md_cluster_info *cinfo = mddev->cluster_info;
298         struct dlm_lock_resource *bm_lockres;
299         char str[64];
300         int slot, ret;
301         struct suspend_info *s, *tmp;
302         sector_t lo, hi;
303
304         while (cinfo->recovery_map) {
305                 slot = fls64((u64)cinfo->recovery_map) - 1;
306
307                 /* Clear suspend_area associated with the bitmap */
308                 spin_lock_irq(&cinfo->suspend_lock);
309                 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
310                         if (slot == s->slot) {
311                                 list_del(&s->list);
312                                 kfree(s);
313                         }
314                 spin_unlock_irq(&cinfo->suspend_lock);
315
316                 snprintf(str, 64, "bitmap%04d", slot);
317                 bm_lockres = lockres_init(mddev, str, NULL, 1);
318                 if (!bm_lockres) {
319                         pr_err("md-cluster: Cannot initialize bitmaps\n");
320                         goto clear_bit;
321                 }
322
323                 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
324                 if (ret) {
325                         pr_err("md-cluster: Could not DLM lock %s: %d\n",
326                                         str, ret);
327                         goto clear_bit;
328                 }
329                 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
330                 if (ret) {
331                         pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
332                         goto clear_bit;
333                 }
334                 if (hi > 0) {
335                         if (lo < mddev->recovery_cp)
336                                 mddev->recovery_cp = lo;
337                         /* wake up thread to continue resync in case resync
338                          * is not finished */
339                         if (mddev->recovery_cp != MaxSector) {
340                             set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
341                             md_wakeup_thread(mddev->thread);
342                         }
343                 }
344 clear_bit:
345                 lockres_free(bm_lockres);
346                 clear_bit(slot, &cinfo->recovery_map);
347         }
348 }
349
350 static void recover_prep(void *arg)
351 {
352         struct mddev *mddev = arg;
353         struct md_cluster_info *cinfo = mddev->cluster_info;
354         set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
355 }
356
357 static void __recover_slot(struct mddev *mddev, int slot)
358 {
359         struct md_cluster_info *cinfo = mddev->cluster_info;
360
361         set_bit(slot, &cinfo->recovery_map);
362         if (!cinfo->recovery_thread) {
363                 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
364                                 mddev, "recover");
365                 if (!cinfo->recovery_thread) {
366                         pr_warn("md-cluster: Could not create recovery thread\n");
367                         return;
368                 }
369         }
370         md_wakeup_thread(cinfo->recovery_thread);
371 }
372
373 static void recover_slot(void *arg, struct dlm_slot *slot)
374 {
375         struct mddev *mddev = arg;
376         struct md_cluster_info *cinfo = mddev->cluster_info;
377
378         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
379                         mddev->bitmap_info.cluster_name,
380                         slot->nodeid, slot->slot,
381                         cinfo->slot_number);
382         /* deduct one since dlm slot starts from one while the num of
383          * cluster-md begins with 0 */
384         __recover_slot(mddev, slot->slot - 1);
385 }
386
387 static void recover_done(void *arg, struct dlm_slot *slots,
388                 int num_slots, int our_slot,
389                 uint32_t generation)
390 {
391         struct mddev *mddev = arg;
392         struct md_cluster_info *cinfo = mddev->cluster_info;
393
394         cinfo->slot_number = our_slot;
395         /* completion is only need to be complete when node join cluster,
396          * it doesn't need to run during another node's failure */
397         if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
398                 complete(&cinfo->completion);
399                 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
400         }
401         clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
402 }
403
404 /* the ops is called when node join the cluster, and do lock recovery
405  * if node failure occurs */
406 static const struct dlm_lockspace_ops md_ls_ops = {
407         .recover_prep = recover_prep,
408         .recover_slot = recover_slot,
409         .recover_done = recover_done,
410 };
411
412 /*
413  * The BAST function for the ack lock resource
414  * This function wakes up the receive thread in
415  * order to receive and process the message.
416  */
417 static void ack_bast(void *arg, int mode)
418 {
419         struct dlm_lock_resource *res = arg;
420         struct md_cluster_info *cinfo = res->mddev->cluster_info;
421
422         if (mode == DLM_LOCK_EX) {
423                 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
424                         md_wakeup_thread(cinfo->recv_thread);
425                 else
426                         set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
427         }
428 }
429
430 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
431 {
432         struct suspend_info *s, *tmp;
433
434         list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
435                 if (slot == s->slot) {
436                         list_del(&s->list);
437                         kfree(s);
438                         break;
439                 }
440 }
441
442 static void remove_suspend_info(struct mddev *mddev, int slot)
443 {
444         struct md_cluster_info *cinfo = mddev->cluster_info;
445         spin_lock_irq(&cinfo->suspend_lock);
446         __remove_suspend_info(cinfo, slot);
447         spin_unlock_irq(&cinfo->suspend_lock);
448         mddev->pers->quiesce(mddev, 2);
449 }
450
451
452 static void process_suspend_info(struct mddev *mddev,
453                 int slot, sector_t lo, sector_t hi)
454 {
455         struct md_cluster_info *cinfo = mddev->cluster_info;
456         struct suspend_info *s;
457
458         if (!hi) {
459                 remove_suspend_info(mddev, slot);
460                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
461                 md_wakeup_thread(mddev->thread);
462                 return;
463         }
464
465         /*
466          * The bitmaps are not same for different nodes
467          * if RESYNCING is happening in one node, then
468          * the node which received the RESYNCING message
469          * probably will perform resync with the region
470          * [lo, hi] again, so we could reduce resync time
471          * a lot if we can ensure that the bitmaps among
472          * different nodes are match up well.
473          *
474          * sync_low/hi is used to record the region which
475          * arrived in the previous RESYNCING message,
476          *
477          * Call bitmap_sync_with_cluster to clear
478          * NEEDED_MASK and set RESYNC_MASK since
479          * resync thread is running in another node,
480          * so we don't need to do the resync again
481          * with the same section */
482         bitmap_sync_with_cluster(mddev, cinfo->sync_low,
483                                         cinfo->sync_hi,
484                                         lo, hi);
485         cinfo->sync_low = lo;
486         cinfo->sync_hi = hi;
487
488         s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
489         if (!s)
490                 return;
491         s->slot = slot;
492         s->lo = lo;
493         s->hi = hi;
494         mddev->pers->quiesce(mddev, 1);
495         mddev->pers->quiesce(mddev, 0);
496         spin_lock_irq(&cinfo->suspend_lock);
497         /* Remove existing entry (if exists) before adding */
498         __remove_suspend_info(cinfo, slot);
499         list_add(&s->list, &cinfo->suspend_list);
500         spin_unlock_irq(&cinfo->suspend_lock);
501         mddev->pers->quiesce(mddev, 2);
502 }
503
504 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
505 {
506         char disk_uuid[64];
507         struct md_cluster_info *cinfo = mddev->cluster_info;
508         char event_name[] = "EVENT=ADD_DEVICE";
509         char raid_slot[16];
510         char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
511         int len;
512
513         len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
514         sprintf(disk_uuid + len, "%pU", cmsg->uuid);
515         snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
516         pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
517         init_completion(&cinfo->newdisk_completion);
518         set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
519         kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
520         wait_for_completion_timeout(&cinfo->newdisk_completion,
521                         NEW_DEV_TIMEOUT);
522         clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
523 }
524
525
526 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
527 {
528         int got_lock = 0;
529         struct md_cluster_info *cinfo = mddev->cluster_info;
530         mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
531
532         dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
533         wait_event(mddev->thread->wqueue,
534                    (got_lock = mddev_trylock(mddev)) ||
535                     test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
536         md_reload_sb(mddev, mddev->good_device_nr);
537         if (got_lock)
538                 mddev_unlock(mddev);
539 }
540
541 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
542 {
543         struct md_rdev *rdev;
544
545         rcu_read_lock();
546         rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
547         if (rdev) {
548                 set_bit(ClusterRemove, &rdev->flags);
549                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
550                 md_wakeup_thread(mddev->thread);
551         }
552         else
553                 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
554                         __func__, __LINE__, le32_to_cpu(msg->raid_slot));
555         rcu_read_unlock();
556 }
557
558 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
559 {
560         struct md_rdev *rdev;
561
562         rcu_read_lock();
563         rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
564         if (rdev && test_bit(Faulty, &rdev->flags))
565                 clear_bit(Faulty, &rdev->flags);
566         else
567                 pr_warn("%s: %d Could not find disk(%d) which is faulty",
568                         __func__, __LINE__, le32_to_cpu(msg->raid_slot));
569         rcu_read_unlock();
570 }
571
572 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
573 {
574         int ret = 0;
575
576         if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
577                 "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
578                 return -1;
579         switch (le32_to_cpu(msg->type)) {
580         case METADATA_UPDATED:
581                 process_metadata_update(mddev, msg);
582                 break;
583         case CHANGE_CAPACITY:
584                 set_capacity(mddev->gendisk, mddev->array_sectors);
585                 revalidate_disk(mddev->gendisk);
586                 break;
587         case RESYNCING:
588                 process_suspend_info(mddev, le32_to_cpu(msg->slot),
589                                      le64_to_cpu(msg->low),
590                                      le64_to_cpu(msg->high));
591                 break;
592         case NEWDISK:
593                 process_add_new_disk(mddev, msg);
594                 break;
595         case REMOVE:
596                 process_remove_disk(mddev, msg);
597                 break;
598         case RE_ADD:
599                 process_readd_disk(mddev, msg);
600                 break;
601         case BITMAP_NEEDS_SYNC:
602                 __recover_slot(mddev, le32_to_cpu(msg->slot));
603                 break;
604         default:
605                 ret = -1;
606                 pr_warn("%s:%d Received unknown message from %d\n",
607                         __func__, __LINE__, msg->slot);
608         }
609         return ret;
610 }
611
612 /*
613  * thread for receiving message
614  */
615 static void recv_daemon(struct md_thread *thread)
616 {
617         struct md_cluster_info *cinfo = thread->mddev->cluster_info;
618         struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
619         struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
620         struct cluster_msg msg;
621         int ret;
622
623         mutex_lock(&cinfo->recv_mutex);
624         /*get CR on Message*/
625         if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
626                 pr_err("md/raid1:failed to get CR on MESSAGE\n");
627                 mutex_unlock(&cinfo->recv_mutex);
628                 return;
629         }
630
631         /* read lvb and wake up thread to process this message_lockres */
632         memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
633         ret = process_recvd_msg(thread->mddev, &msg);
634         if (ret)
635                 goto out;
636
637         /*release CR on ack_lockres*/
638         ret = dlm_unlock_sync(ack_lockres);
639         if (unlikely(ret != 0))
640                 pr_info("unlock ack failed return %d\n", ret);
641         /*up-convert to PR on message_lockres*/
642         ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
643         if (unlikely(ret != 0))
644                 pr_info("lock PR on msg failed return %d\n", ret);
645         /*get CR on ack_lockres again*/
646         ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
647         if (unlikely(ret != 0))
648                 pr_info("lock CR on ack failed return %d\n", ret);
649 out:
650         /*release CR on message_lockres*/
651         ret = dlm_unlock_sync(message_lockres);
652         if (unlikely(ret != 0))
653                 pr_info("unlock msg failed return %d\n", ret);
654         mutex_unlock(&cinfo->recv_mutex);
655 }
656
657 /* lock_token()
658  * Takes the lock on the TOKEN lock resource so no other
659  * node can communicate while the operation is underway.
660  */
661 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked)
662 {
663         int error, set_bit = 0;
664         struct mddev *mddev = cinfo->mddev;
665
666         /*
667          * If resync thread run after raid1d thread, then process_metadata_update
668          * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
669          * since another node already got EX on Token and waitting the EX of Ack),
670          * so let resync wake up thread in case flag is set.
671          */
672         if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
673                                       &cinfo->state)) {
674                 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
675                                               &cinfo->state);
676                 WARN_ON_ONCE(error);
677                 md_wakeup_thread(mddev->thread);
678                 set_bit = 1;
679         }
680         error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
681         if (set_bit)
682                 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
683
684         if (error)
685                 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
686                                 __func__, __LINE__, error);
687
688         /* Lock the receive sequence */
689         mutex_lock(&cinfo->recv_mutex);
690         return error;
691 }
692
693 /* lock_comm()
694  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
695  */
696 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
697 {
698         wait_event(cinfo->wait,
699                    !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
700
701         return lock_token(cinfo, mddev_locked);
702 }
703
704 static void unlock_comm(struct md_cluster_info *cinfo)
705 {
706         WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
707         mutex_unlock(&cinfo->recv_mutex);
708         dlm_unlock_sync(cinfo->token_lockres);
709         clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
710         wake_up(&cinfo->wait);
711 }
712
713 /* __sendmsg()
714  * This function performs the actual sending of the message. This function is
715  * usually called after performing the encompassing operation
716  * The function:
717  * 1. Grabs the message lockresource in EX mode
718  * 2. Copies the message to the message LVB
719  * 3. Downconverts message lockresource to CW
720  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
721  *    and the other nodes read the message. The thread will wait here until all other
722  *    nodes have released ack lock resource.
723  * 5. Downconvert ack lockresource to CR
724  */
725 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
726 {
727         int error;
728         int slot = cinfo->slot_number - 1;
729
730         cmsg->slot = cpu_to_le32(slot);
731         /*get EX on Message*/
732         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
733         if (error) {
734                 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
735                 goto failed_message;
736         }
737
738         memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
739                         sizeof(struct cluster_msg));
740         /*down-convert EX to CW on Message*/
741         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
742         if (error) {
743                 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
744                                 error);
745                 goto failed_ack;
746         }
747
748         /*up-convert CR to EX on Ack*/
749         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
750         if (error) {
751                 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
752                                 error);
753                 goto failed_ack;
754         }
755
756         /*down-convert EX to CR on Ack*/
757         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
758         if (error) {
759                 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
760                                 error);
761                 goto failed_ack;
762         }
763
764 failed_ack:
765         error = dlm_unlock_sync(cinfo->message_lockres);
766         if (unlikely(error != 0)) {
767                 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
768                         error);
769                 /* in case the message can't be released due to some reason */
770                 goto failed_ack;
771         }
772 failed_message:
773         return error;
774 }
775
776 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
777                    bool mddev_locked)
778 {
779         int ret;
780
781         lock_comm(cinfo, mddev_locked);
782         ret = __sendmsg(cinfo, cmsg);
783         unlock_comm(cinfo);
784         return ret;
785 }
786
787 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
788 {
789         struct md_cluster_info *cinfo = mddev->cluster_info;
790         int i, ret = 0;
791         struct dlm_lock_resource *bm_lockres;
792         struct suspend_info *s;
793         char str[64];
794         sector_t lo, hi;
795
796
797         for (i = 0; i < total_slots; i++) {
798                 memset(str, '\0', 64);
799                 snprintf(str, 64, "bitmap%04d", i);
800                 bm_lockres = lockres_init(mddev, str, NULL, 1);
801                 if (!bm_lockres)
802                         return -ENOMEM;
803                 if (i == (cinfo->slot_number - 1)) {
804                         lockres_free(bm_lockres);
805                         continue;
806                 }
807
808                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
809                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
810                 if (ret == -EAGAIN) {
811                         s = read_resync_info(mddev, bm_lockres);
812                         if (s) {
813                                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
814                                                 __func__, __LINE__,
815                                                 (unsigned long long) s->lo,
816                                                 (unsigned long long) s->hi, i);
817                                 spin_lock_irq(&cinfo->suspend_lock);
818                                 s->slot = i;
819                                 list_add(&s->list, &cinfo->suspend_list);
820                                 spin_unlock_irq(&cinfo->suspend_lock);
821                         }
822                         ret = 0;
823                         lockres_free(bm_lockres);
824                         continue;
825                 }
826                 if (ret) {
827                         lockres_free(bm_lockres);
828                         goto out;
829                 }
830
831                 /* Read the disk bitmap sb and check if it needs recovery */
832                 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
833                 if (ret) {
834                         pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
835                         lockres_free(bm_lockres);
836                         continue;
837                 }
838                 if ((hi > 0) && (lo < mddev->recovery_cp)) {
839                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
840                         mddev->recovery_cp = lo;
841                         md_check_recovery(mddev);
842                 }
843
844                 lockres_free(bm_lockres);
845         }
846 out:
847         return ret;
848 }
849
850 static int join(struct mddev *mddev, int nodes)
851 {
852         struct md_cluster_info *cinfo;
853         int ret, ops_rv;
854         char str[64];
855
856         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
857         if (!cinfo)
858                 return -ENOMEM;
859
860         INIT_LIST_HEAD(&cinfo->suspend_list);
861         spin_lock_init(&cinfo->suspend_lock);
862         init_completion(&cinfo->completion);
863         set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
864         init_waitqueue_head(&cinfo->wait);
865         mutex_init(&cinfo->recv_mutex);
866
867         mddev->cluster_info = cinfo;
868         cinfo->mddev = mddev;
869
870         memset(str, 0, 64);
871         sprintf(str, "%pU", mddev->uuid);
872         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
873                                 DLM_LSFL_FS, LVB_SIZE,
874                                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
875         if (ret)
876                 goto err;
877         wait_for_completion(&cinfo->completion);
878         if (nodes < cinfo->slot_number) {
879                 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
880                         cinfo->slot_number, nodes);
881                 ret = -ERANGE;
882                 goto err;
883         }
884         /* Initiate the communication resources */
885         ret = -ENOMEM;
886         cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
887         if (!cinfo->recv_thread) {
888                 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
889                 goto err;
890         }
891         cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
892         if (!cinfo->message_lockres)
893                 goto err;
894         cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
895         if (!cinfo->token_lockres)
896                 goto err;
897         cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
898         if (!cinfo->no_new_dev_lockres)
899                 goto err;
900
901         ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
902         if (ret) {
903                 ret = -EAGAIN;
904                 pr_err("md-cluster: can't join cluster to avoid lock issue\n");
905                 goto err;
906         }
907         cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
908         if (!cinfo->ack_lockres) {
909                 ret = -ENOMEM;
910                 goto err;
911         }
912         /* get sync CR lock on ACK. */
913         if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
914                 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
915                                 ret);
916         dlm_unlock_sync(cinfo->token_lockres);
917         /* get sync CR lock on no-new-dev. */
918         if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
919                 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
920
921
922         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
923         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
924         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
925         if (!cinfo->bitmap_lockres) {
926                 ret = -ENOMEM;
927                 goto err;
928         }
929         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
930                 pr_err("Failed to get bitmap lock\n");
931                 ret = -EINVAL;
932                 goto err;
933         }
934
935         cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
936         if (!cinfo->resync_lockres) {
937                 ret = -ENOMEM;
938                 goto err;
939         }
940
941         return 0;
942 err:
943         set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
944         md_unregister_thread(&cinfo->recovery_thread);
945         md_unregister_thread(&cinfo->recv_thread);
946         lockres_free(cinfo->message_lockres);
947         lockres_free(cinfo->token_lockres);
948         lockres_free(cinfo->ack_lockres);
949         lockres_free(cinfo->no_new_dev_lockres);
950         lockres_free(cinfo->resync_lockres);
951         lockres_free(cinfo->bitmap_lockres);
952         if (cinfo->lockspace)
953                 dlm_release_lockspace(cinfo->lockspace, 2);
954         mddev->cluster_info = NULL;
955         kfree(cinfo);
956         return ret;
957 }
958
959 static void load_bitmaps(struct mddev *mddev, int total_slots)
960 {
961         struct md_cluster_info *cinfo = mddev->cluster_info;
962
963         /* load all the node's bitmap info for resync */
964         if (gather_all_resync_info(mddev, total_slots))
965                 pr_err("md-cluster: failed to gather all resyn infos\n");
966         set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
967         /* wake up recv thread in case something need to be handled */
968         if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
969                 md_wakeup_thread(cinfo->recv_thread);
970 }
971
972 static void resync_bitmap(struct mddev *mddev)
973 {
974         struct md_cluster_info *cinfo = mddev->cluster_info;
975         struct cluster_msg cmsg = {0};
976         int err;
977
978         cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
979         err = sendmsg(cinfo, &cmsg, 1);
980         if (err)
981                 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
982                         __func__, __LINE__, err);
983 }
984
985 static void unlock_all_bitmaps(struct mddev *mddev);
986 static int leave(struct mddev *mddev)
987 {
988         struct md_cluster_info *cinfo = mddev->cluster_info;
989
990         if (!cinfo)
991                 return 0;
992
993         /* BITMAP_NEEDS_SYNC message should be sent when node
994          * is leaving the cluster with dirty bitmap, also we
995          * can only deliver it when dlm connection is available */
996         if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
997                 resync_bitmap(mddev);
998
999         set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1000         md_unregister_thread(&cinfo->recovery_thread);
1001         md_unregister_thread(&cinfo->recv_thread);
1002         lockres_free(cinfo->message_lockres);
1003         lockres_free(cinfo->token_lockres);
1004         lockres_free(cinfo->ack_lockres);
1005         lockres_free(cinfo->no_new_dev_lockres);
1006         lockres_free(cinfo->resync_lockres);
1007         lockres_free(cinfo->bitmap_lockres);
1008         unlock_all_bitmaps(mddev);
1009         dlm_release_lockspace(cinfo->lockspace, 2);
1010         kfree(cinfo);
1011         return 0;
1012 }
1013
1014 /* slot_number(): Returns the MD slot number to use
1015  * DLM starts the slot numbers from 1, wheras cluster-md
1016  * wants the number to be from zero, so we deduct one
1017  */
1018 static int slot_number(struct mddev *mddev)
1019 {
1020         struct md_cluster_info *cinfo = mddev->cluster_info;
1021
1022         return cinfo->slot_number - 1;
1023 }
1024
1025 /*
1026  * Check if the communication is already locked, else lock the communication
1027  * channel.
1028  * If it is already locked, token is in EX mode, and hence lock_token()
1029  * should not be called.
1030  */
1031 static int metadata_update_start(struct mddev *mddev)
1032 {
1033         struct md_cluster_info *cinfo = mddev->cluster_info;
1034         int ret;
1035
1036         /*
1037          * metadata_update_start is always called with the protection of
1038          * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1039          */
1040         ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1041                                     &cinfo->state);
1042         WARN_ON_ONCE(ret);
1043         md_wakeup_thread(mddev->thread);
1044
1045         wait_event(cinfo->wait,
1046                    !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1047                    test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1048
1049         /* If token is already locked, return 0 */
1050         if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1051                 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1052                 return 0;
1053         }
1054
1055         ret = lock_token(cinfo, 1);
1056         clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1057         return ret;
1058 }
1059
1060 static int metadata_update_finish(struct mddev *mddev)
1061 {
1062         struct md_cluster_info *cinfo = mddev->cluster_info;
1063         struct cluster_msg cmsg;
1064         struct md_rdev *rdev;
1065         int ret = 0;
1066         int raid_slot = -1;
1067
1068         memset(&cmsg, 0, sizeof(cmsg));
1069         cmsg.type = cpu_to_le32(METADATA_UPDATED);
1070         /* Pick up a good active device number to send.
1071          */
1072         rdev_for_each(rdev, mddev)
1073                 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1074                         raid_slot = rdev->desc_nr;
1075                         break;
1076                 }
1077         if (raid_slot >= 0) {
1078                 cmsg.raid_slot = cpu_to_le32(raid_slot);
1079                 ret = __sendmsg(cinfo, &cmsg);
1080         } else
1081                 pr_warn("md-cluster: No good device id found to send\n");
1082         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1083         unlock_comm(cinfo);
1084         return ret;
1085 }
1086
1087 static void metadata_update_cancel(struct mddev *mddev)
1088 {
1089         struct md_cluster_info *cinfo = mddev->cluster_info;
1090         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1091         unlock_comm(cinfo);
1092 }
1093
1094 /*
1095  * return 0 if all the bitmaps have the same sync_size
1096  */
1097 int cluster_check_sync_size(struct mddev *mddev)
1098 {
1099         int i, rv;
1100         bitmap_super_t *sb;
1101         unsigned long my_sync_size, sync_size = 0;
1102         int node_num = mddev->bitmap_info.nodes;
1103         int current_slot = md_cluster_ops->slot_number(mddev);
1104         struct bitmap *bitmap = mddev->bitmap;
1105         char str[64];
1106         struct dlm_lock_resource *bm_lockres;
1107
1108         sb = kmap_atomic(bitmap->storage.sb_page);
1109         my_sync_size = sb->sync_size;
1110         kunmap_atomic(sb);
1111
1112         for (i = 0; i < node_num; i++) {
1113                 if (i == current_slot)
1114                         continue;
1115
1116                 bitmap = get_bitmap_from_slot(mddev, i);
1117                 if (IS_ERR(bitmap)) {
1118                         pr_err("can't get bitmap from slot %d\n", i);
1119                         return -1;
1120                 }
1121
1122                 /*
1123                  * If we can hold the bitmap lock of one node then
1124                  * the slot is not occupied, update the sb.
1125                  */
1126                 snprintf(str, 64, "bitmap%04d", i);
1127                 bm_lockres = lockres_init(mddev, str, NULL, 1);
1128                 if (!bm_lockres) {
1129                         pr_err("md-cluster: Cannot initialize %s\n", str);
1130                         bitmap_free(bitmap);
1131                         return -1;
1132                 }
1133                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
1134                 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1135                 if (!rv)
1136                         bitmap_update_sb(bitmap);
1137                 lockres_free(bm_lockres);
1138
1139                 sb = kmap_atomic(bitmap->storage.sb_page);
1140                 if (sync_size == 0)
1141                         sync_size = sb->sync_size;
1142                 else if (sync_size != sb->sync_size) {
1143                         kunmap_atomic(sb);
1144                         bitmap_free(bitmap);
1145                         return -1;
1146                 }
1147                 kunmap_atomic(sb);
1148                 bitmap_free(bitmap);
1149         }
1150
1151         return (my_sync_size == sync_size) ? 0 : -1;
1152 }
1153
1154 /*
1155  * Update the size for cluster raid is a little more complex, we perform it
1156  * by the steps:
1157  * 1. hold token lock and update superblock in initiator node.
1158  * 2. send METADATA_UPDATED msg to other nodes.
1159  * 3. The initiator node continues to check each bitmap's sync_size, if all
1160  *    bitmaps have the same value of sync_size, then we can set capacity and
1161  *    let other nodes to perform it. If one node can't update sync_size
1162  *    accordingly, we need to revert to previous value.
1163  */
1164 static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1165 {
1166         struct md_cluster_info *cinfo = mddev->cluster_info;
1167         struct cluster_msg cmsg;
1168         struct md_rdev *rdev;
1169         int ret = 0;
1170         int raid_slot = -1;
1171
1172         md_update_sb(mddev, 1);
1173         lock_comm(cinfo, 1);
1174
1175         memset(&cmsg, 0, sizeof(cmsg));
1176         cmsg.type = cpu_to_le32(METADATA_UPDATED);
1177         rdev_for_each(rdev, mddev)
1178                 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1179                         raid_slot = rdev->desc_nr;
1180                         break;
1181                 }
1182         if (raid_slot >= 0) {
1183                 cmsg.raid_slot = cpu_to_le32(raid_slot);
1184                 /*
1185                  * We can only change capiticy after all the nodes can do it,
1186                  * so need to wait after other nodes already received the msg
1187                  * and handled the change
1188                  */
1189                 ret = __sendmsg(cinfo, &cmsg);
1190                 if (ret) {
1191                         pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1192                                __func__, __LINE__);
1193                         unlock_comm(cinfo);
1194                         return;
1195                 }
1196         } else {
1197                 pr_err("md-cluster: No good device id found to send\n");
1198                 unlock_comm(cinfo);
1199                 return;
1200         }
1201
1202         /*
1203          * check the sync_size from other node's bitmap, if sync_size
1204          * have already updated in other nodes as expected, send an
1205          * empty metadata msg to permit the change of capacity
1206          */
1207         if (cluster_check_sync_size(mddev) == 0) {
1208                 memset(&cmsg, 0, sizeof(cmsg));
1209                 cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1210                 ret = __sendmsg(cinfo, &cmsg);
1211                 if (ret)
1212                         pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1213                                __func__, __LINE__);
1214                 set_capacity(mddev->gendisk, mddev->array_sectors);
1215                 revalidate_disk(mddev->gendisk);
1216         } else {
1217                 /* revert to previous sectors */
1218                 ret = mddev->pers->resize(mddev, old_dev_sectors);
1219                 if (!ret)
1220                         revalidate_disk(mddev->gendisk);
1221                 ret = __sendmsg(cinfo, &cmsg);
1222                 if (ret)
1223                         pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1224                                __func__, __LINE__);
1225         }
1226         unlock_comm(cinfo);
1227 }
1228
1229 static int resync_start(struct mddev *mddev)
1230 {
1231         struct md_cluster_info *cinfo = mddev->cluster_info;
1232         return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1233 }
1234
1235 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1236 {
1237         struct md_cluster_info *cinfo = mddev->cluster_info;
1238         struct resync_info ri;
1239         struct cluster_msg cmsg = {0};
1240
1241         /* do not send zero again, if we have sent before */
1242         if (hi == 0) {
1243                 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1244                 if (le64_to_cpu(ri.hi) == 0)
1245                         return 0;
1246         }
1247
1248         add_resync_info(cinfo->bitmap_lockres, lo, hi);
1249         /* Re-acquire the lock to refresh LVB */
1250         dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1251         cmsg.type = cpu_to_le32(RESYNCING);
1252         cmsg.low = cpu_to_le64(lo);
1253         cmsg.high = cpu_to_le64(hi);
1254
1255         /*
1256          * mddev_lock is held if resync_info_update is called from
1257          * resync_finish (md_reap_sync_thread -> resync_finish)
1258          */
1259         if (lo == 0 && hi == 0)
1260                 return sendmsg(cinfo, &cmsg, 1);
1261         else
1262                 return sendmsg(cinfo, &cmsg, 0);
1263 }
1264
1265 static int resync_finish(struct mddev *mddev)
1266 {
1267         struct md_cluster_info *cinfo = mddev->cluster_info;
1268         dlm_unlock_sync(cinfo->resync_lockres);
1269         return resync_info_update(mddev, 0, 0);
1270 }
1271
1272 static int area_resyncing(struct mddev *mddev, int direction,
1273                 sector_t lo, sector_t hi)
1274 {
1275         struct md_cluster_info *cinfo = mddev->cluster_info;
1276         int ret = 0;
1277         struct suspend_info *s;
1278
1279         if ((direction == READ) &&
1280                 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1281                 return 1;
1282
1283         spin_lock_irq(&cinfo->suspend_lock);
1284         if (list_empty(&cinfo->suspend_list))
1285                 goto out;
1286         list_for_each_entry(s, &cinfo->suspend_list, list)
1287                 if (hi > s->lo && lo < s->hi) {
1288                         ret = 1;
1289                         break;
1290                 }
1291 out:
1292         spin_unlock_irq(&cinfo->suspend_lock);
1293         return ret;
1294 }
1295
1296 /* add_new_disk() - initiates a disk add
1297  * However, if this fails before writing md_update_sb(),
1298  * add_new_disk_cancel() must be called to release token lock
1299  */
1300 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1301 {
1302         struct md_cluster_info *cinfo = mddev->cluster_info;
1303         struct cluster_msg cmsg;
1304         int ret = 0;
1305         struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1306         char *uuid = sb->device_uuid;
1307
1308         memset(&cmsg, 0, sizeof(cmsg));
1309         cmsg.type = cpu_to_le32(NEWDISK);
1310         memcpy(cmsg.uuid, uuid, 16);
1311         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1312         lock_comm(cinfo, 1);
1313         ret = __sendmsg(cinfo, &cmsg);
1314         if (ret) {
1315                 unlock_comm(cinfo);
1316                 return ret;
1317         }
1318         cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1319         ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1320         cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1321         /* Some node does not "see" the device */
1322         if (ret == -EAGAIN)
1323                 ret = -ENOENT;
1324         if (ret)
1325                 unlock_comm(cinfo);
1326         else {
1327                 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1328                 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1329                  * will run soon after add_new_disk, the below path will be
1330                  * invoked:
1331                  *   md_wakeup_thread(mddev->thread)
1332                  *      -> conf->thread (raid1d)
1333                  *      -> md_check_recovery -> md_update_sb
1334                  *      -> metadata_update_start/finish
1335                  * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1336                  *
1337                  * For other failure cases, metadata_update_cancel and
1338                  * add_new_disk_cancel also clear below bit as well.
1339                  * */
1340                 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1341                 wake_up(&cinfo->wait);
1342         }
1343         return ret;
1344 }
1345
1346 static void add_new_disk_cancel(struct mddev *mddev)
1347 {
1348         struct md_cluster_info *cinfo = mddev->cluster_info;
1349         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1350         unlock_comm(cinfo);
1351 }
1352
1353 static int new_disk_ack(struct mddev *mddev, bool ack)
1354 {
1355         struct md_cluster_info *cinfo = mddev->cluster_info;
1356
1357         if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1358                 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1359                 return -EINVAL;
1360         }
1361
1362         if (ack)
1363                 dlm_unlock_sync(cinfo->no_new_dev_lockres);
1364         complete(&cinfo->newdisk_completion);
1365         return 0;
1366 }
1367
1368 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1369 {
1370         struct cluster_msg cmsg = {0};
1371         struct md_cluster_info *cinfo = mddev->cluster_info;
1372         cmsg.type = cpu_to_le32(REMOVE);
1373         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1374         return sendmsg(cinfo, &cmsg, 1);
1375 }
1376
1377 static int lock_all_bitmaps(struct mddev *mddev)
1378 {
1379         int slot, my_slot, ret, held = 1, i = 0;
1380         char str[64];
1381         struct md_cluster_info *cinfo = mddev->cluster_info;
1382
1383         cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
1384                                              sizeof(struct dlm_lock_resource *),
1385                                              GFP_KERNEL);
1386         if (!cinfo->other_bitmap_lockres) {
1387                 pr_err("md: can't alloc mem for other bitmap locks\n");
1388                 return 0;
1389         }
1390
1391         my_slot = slot_number(mddev);
1392         for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1393                 if (slot == my_slot)
1394                         continue;
1395
1396                 memset(str, '\0', 64);
1397                 snprintf(str, 64, "bitmap%04d", slot);
1398                 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1399                 if (!cinfo->other_bitmap_lockres[i])
1400                         return -ENOMEM;
1401
1402                 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1403                 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1404                 if (ret)
1405                         held = -1;
1406                 i++;
1407         }
1408
1409         return held;
1410 }
1411
1412 static void unlock_all_bitmaps(struct mddev *mddev)
1413 {
1414         struct md_cluster_info *cinfo = mddev->cluster_info;
1415         int i;
1416
1417         /* release other node's bitmap lock if they are existed */
1418         if (cinfo->other_bitmap_lockres) {
1419                 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1420                         if (cinfo->other_bitmap_lockres[i]) {
1421                                 lockres_free(cinfo->other_bitmap_lockres[i]);
1422                         }
1423                 }
1424                 kfree(cinfo->other_bitmap_lockres);
1425         }
1426 }
1427
1428 static int gather_bitmaps(struct md_rdev *rdev)
1429 {
1430         int sn, err;
1431         sector_t lo, hi;
1432         struct cluster_msg cmsg = {0};
1433         struct mddev *mddev = rdev->mddev;
1434         struct md_cluster_info *cinfo = mddev->cluster_info;
1435
1436         cmsg.type = cpu_to_le32(RE_ADD);
1437         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1438         err = sendmsg(cinfo, &cmsg, 1);
1439         if (err)
1440                 goto out;
1441
1442         for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1443                 if (sn == (cinfo->slot_number - 1))
1444                         continue;
1445                 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1446                 if (err) {
1447                         pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1448                         goto out;
1449                 }
1450                 if ((hi > 0) && (lo < mddev->recovery_cp))
1451                         mddev->recovery_cp = lo;
1452         }
1453 out:
1454         return err;
1455 }
1456
1457 static struct md_cluster_operations cluster_ops = {
1458         .join   = join,
1459         .leave  = leave,
1460         .slot_number = slot_number,
1461         .resync_start = resync_start,
1462         .resync_finish = resync_finish,
1463         .resync_info_update = resync_info_update,
1464         .metadata_update_start = metadata_update_start,
1465         .metadata_update_finish = metadata_update_finish,
1466         .metadata_update_cancel = metadata_update_cancel,
1467         .area_resyncing = area_resyncing,
1468         .add_new_disk = add_new_disk,
1469         .add_new_disk_cancel = add_new_disk_cancel,
1470         .new_disk_ack = new_disk_ack,
1471         .remove_disk = remove_disk,
1472         .load_bitmaps = load_bitmaps,
1473         .gather_bitmaps = gather_bitmaps,
1474         .lock_all_bitmaps = lock_all_bitmaps,
1475         .unlock_all_bitmaps = unlock_all_bitmaps,
1476         .update_size = update_size,
1477 };
1478
1479 static int __init cluster_init(void)
1480 {
1481         pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1482         pr_info("Registering Cluster MD functions\n");
1483         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1484         return 0;
1485 }
1486
1487 static void cluster_exit(void)
1488 {
1489         unregister_md_cluster_operations();
1490 }
1491
1492 module_init(cluster_init);
1493 module_exit(cluster_exit);
1494 MODULE_AUTHOR("SUSE");
1495 MODULE_LICENSE("GPL");
1496 MODULE_DESCRIPTION("Clustering support for MD");