Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
[sfrench/cifs-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset dlm_kset = {
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         error = wait_event_interruptible(ls->ls_uevent_wait,
201                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
202         if (error)
203                 goto out;
204
205         error = ls->ls_uevent_result;
206  out:
207         return error;
208 }
209
210
211 int dlm_lockspace_init(void)
212 {
213         int error;
214
215         ls_count = 0;
216         mutex_init(&ls_lock);
217         INIT_LIST_HEAD(&lslist);
218         spin_lock_init(&lslist_lock);
219
220         kobj_set_kset_s(&dlm_kset, kernel_subsys);
221         error = kset_register(&dlm_kset);
222         if (error)
223                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
224         return error;
225 }
226
227 void dlm_lockspace_exit(void)
228 {
229         kset_unregister(&dlm_kset);
230 }
231
232 static int dlm_scand(void *data)
233 {
234         struct dlm_ls *ls;
235
236         while (!kthread_should_stop()) {
237                 list_for_each_entry(ls, &lslist, ls_list)
238                         dlm_scan_rsbs(ls);
239                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
240         }
241         return 0;
242 }
243
244 static int dlm_scand_start(void)
245 {
246         struct task_struct *p;
247         int error = 0;
248
249         p = kthread_run(dlm_scand, NULL, "dlm_scand");
250         if (IS_ERR(p))
251                 error = PTR_ERR(p);
252         else
253                 scand_task = p;
254         return error;
255 }
256
257 static void dlm_scand_stop(void)
258 {
259         kthread_stop(scand_task);
260 }
261
262 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
263 {
264         struct dlm_ls *ls;
265
266         spin_lock(&lslist_lock);
267
268         list_for_each_entry(ls, &lslist, ls_list) {
269                 if (ls->ls_namelen == namelen &&
270                     memcmp(ls->ls_name, name, namelen) == 0)
271                         goto out;
272         }
273         ls = NULL;
274  out:
275         spin_unlock(&lslist_lock);
276         return ls;
277 }
278
279 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
280 {
281         struct dlm_ls *ls;
282
283         spin_lock(&lslist_lock);
284
285         list_for_each_entry(ls, &lslist, ls_list) {
286                 if (ls->ls_global_id == id) {
287                         ls->ls_count++;
288                         goto out;
289                 }
290         }
291         ls = NULL;
292  out:
293         spin_unlock(&lslist_lock);
294         return ls;
295 }
296
297 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
298 {
299         struct dlm_ls *ls;
300
301         spin_lock(&lslist_lock);
302         list_for_each_entry(ls, &lslist, ls_list) {
303                 if (ls->ls_local_handle == lockspace) {
304                         ls->ls_count++;
305                         goto out;
306                 }
307         }
308         ls = NULL;
309  out:
310         spin_unlock(&lslist_lock);
311         return ls;
312 }
313
314 struct dlm_ls *dlm_find_lockspace_device(int minor)
315 {
316         struct dlm_ls *ls;
317
318         spin_lock(&lslist_lock);
319         list_for_each_entry(ls, &lslist, ls_list) {
320                 if (ls->ls_device.minor == minor) {
321                         ls->ls_count++;
322                         goto out;
323                 }
324         }
325         ls = NULL;
326  out:
327         spin_unlock(&lslist_lock);
328         return ls;
329 }
330
331 void dlm_put_lockspace(struct dlm_ls *ls)
332 {
333         spin_lock(&lslist_lock);
334         ls->ls_count--;
335         spin_unlock(&lslist_lock);
336 }
337
338 static void remove_lockspace(struct dlm_ls *ls)
339 {
340         for (;;) {
341                 spin_lock(&lslist_lock);
342                 if (ls->ls_count == 0) {
343                         list_del(&ls->ls_list);
344                         spin_unlock(&lslist_lock);
345                         return;
346                 }
347                 spin_unlock(&lslist_lock);
348                 ssleep(1);
349         }
350 }
351
352 static int threads_start(void)
353 {
354         int error;
355
356         /* Thread which process lock requests for all lockspace's */
357         error = dlm_astd_start();
358         if (error) {
359                 log_print("cannot start dlm_astd thread %d", error);
360                 goto fail;
361         }
362
363         error = dlm_scand_start();
364         if (error) {
365                 log_print("cannot start dlm_scand thread %d", error);
366                 goto astd_fail;
367         }
368
369         /* Thread for sending/receiving messages for all lockspace's */
370         error = dlm_lowcomms_start();
371         if (error) {
372                 log_print("cannot start dlm lowcomms %d", error);
373                 goto scand_fail;
374         }
375
376         return 0;
377
378  scand_fail:
379         dlm_scand_stop();
380  astd_fail:
381         dlm_astd_stop();
382  fail:
383         return error;
384 }
385
386 static void threads_stop(void)
387 {
388         dlm_scand_stop();
389         dlm_lowcomms_stop();
390         dlm_astd_stop();
391 }
392
393 static int new_lockspace(char *name, int namelen, void **lockspace,
394                          uint32_t flags, int lvblen)
395 {
396         struct dlm_ls *ls;
397         int i, size, error = -ENOMEM;
398
399         if (namelen > DLM_LOCKSPACE_LEN)
400                 return -EINVAL;
401
402         if (!lvblen || (lvblen % 8))
403                 return -EINVAL;
404
405         if (!try_module_get(THIS_MODULE))
406                 return -EINVAL;
407
408         ls = dlm_find_lockspace_name(name, namelen);
409         if (ls) {
410                 *lockspace = ls;
411                 module_put(THIS_MODULE);
412                 return -EEXIST;
413         }
414
415         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
416         if (!ls)
417                 goto out;
418         memcpy(ls->ls_name, name, namelen);
419         ls->ls_namelen = namelen;
420         ls->ls_exflags = flags;
421         ls->ls_lvblen = lvblen;
422         ls->ls_count = 0;
423         ls->ls_flags = 0;
424
425         size = dlm_config.ci_rsbtbl_size;
426         ls->ls_rsbtbl_size = size;
427
428         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
429         if (!ls->ls_rsbtbl)
430                 goto out_lsfree;
431         for (i = 0; i < size; i++) {
432                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
433                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
434                 rwlock_init(&ls->ls_rsbtbl[i].lock);
435         }
436
437         size = dlm_config.ci_lkbtbl_size;
438         ls->ls_lkbtbl_size = size;
439
440         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
441         if (!ls->ls_lkbtbl)
442                 goto out_rsbfree;
443         for (i = 0; i < size; i++) {
444                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
445                 rwlock_init(&ls->ls_lkbtbl[i].lock);
446                 ls->ls_lkbtbl[i].counter = 1;
447         }
448
449         size = dlm_config.ci_dirtbl_size;
450         ls->ls_dirtbl_size = size;
451
452         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
453         if (!ls->ls_dirtbl)
454                 goto out_lkbfree;
455         for (i = 0; i < size; i++) {
456                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
457                 rwlock_init(&ls->ls_dirtbl[i].lock);
458         }
459
460         INIT_LIST_HEAD(&ls->ls_waiters);
461         mutex_init(&ls->ls_waiters_mutex);
462         INIT_LIST_HEAD(&ls->ls_orphans);
463         mutex_init(&ls->ls_orphans_mutex);
464
465         INIT_LIST_HEAD(&ls->ls_nodes);
466         INIT_LIST_HEAD(&ls->ls_nodes_gone);
467         ls->ls_num_nodes = 0;
468         ls->ls_low_nodeid = 0;
469         ls->ls_total_weight = 0;
470         ls->ls_node_array = NULL;
471
472         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
473         ls->ls_stub_rsb.res_ls = ls;
474
475         ls->ls_debug_rsb_dentry = NULL;
476         ls->ls_debug_waiters_dentry = NULL;
477
478         init_waitqueue_head(&ls->ls_uevent_wait);
479         ls->ls_uevent_result = 0;
480
481         ls->ls_recoverd_task = NULL;
482         mutex_init(&ls->ls_recoverd_active);
483         spin_lock_init(&ls->ls_recover_lock);
484         spin_lock_init(&ls->ls_rcom_spin);
485         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
486         ls->ls_recover_status = 0;
487         ls->ls_recover_seq = 0;
488         ls->ls_recover_args = NULL;
489         init_rwsem(&ls->ls_in_recovery);
490         INIT_LIST_HEAD(&ls->ls_requestqueue);
491         mutex_init(&ls->ls_requestqueue_mutex);
492         mutex_init(&ls->ls_clear_proc_locks);
493
494         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
495         if (!ls->ls_recover_buf)
496                 goto out_dirfree;
497
498         INIT_LIST_HEAD(&ls->ls_recover_list);
499         spin_lock_init(&ls->ls_recover_list_lock);
500         ls->ls_recover_list_count = 0;
501         ls->ls_local_handle = ls;
502         init_waitqueue_head(&ls->ls_wait_general);
503         INIT_LIST_HEAD(&ls->ls_root_list);
504         init_rwsem(&ls->ls_root_sem);
505
506         down_write(&ls->ls_in_recovery);
507
508         spin_lock(&lslist_lock);
509         list_add(&ls->ls_list, &lslist);
510         spin_unlock(&lslist_lock);
511
512         /* needs to find ls in lslist */
513         error = dlm_recoverd_start(ls);
514         if (error) {
515                 log_error(ls, "can't start dlm_recoverd %d", error);
516                 goto out_rcomfree;
517         }
518
519         dlm_create_debug_file(ls);
520
521         error = kobject_setup(ls);
522         if (error)
523                 goto out_del;
524
525         error = kobject_register(&ls->ls_kobj);
526         if (error)
527                 goto out_del;
528
529         error = do_uevent(ls, 1);
530         if (error)
531                 goto out_unreg;
532
533         *lockspace = ls;
534         return 0;
535
536  out_unreg:
537         kobject_unregister(&ls->ls_kobj);
538  out_del:
539         dlm_delete_debug_file(ls);
540         dlm_recoverd_stop(ls);
541  out_rcomfree:
542         spin_lock(&lslist_lock);
543         list_del(&ls->ls_list);
544         spin_unlock(&lslist_lock);
545         kfree(ls->ls_recover_buf);
546  out_dirfree:
547         kfree(ls->ls_dirtbl);
548  out_lkbfree:
549         kfree(ls->ls_lkbtbl);
550  out_rsbfree:
551         kfree(ls->ls_rsbtbl);
552  out_lsfree:
553         kfree(ls);
554  out:
555         module_put(THIS_MODULE);
556         return error;
557 }
558
559 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
560                       uint32_t flags, int lvblen)
561 {
562         int error = 0;
563
564         mutex_lock(&ls_lock);
565         if (!ls_count)
566                 error = threads_start();
567         if (error)
568                 goto out;
569
570         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
571         if (!error)
572                 ls_count++;
573  out:
574         mutex_unlock(&ls_lock);
575         return error;
576 }
577
578 /* Return 1 if the lockspace still has active remote locks,
579  *        2 if the lockspace still has active local locks.
580  */
581 static int lockspace_busy(struct dlm_ls *ls)
582 {
583         int i, lkb_found = 0;
584         struct dlm_lkb *lkb;
585
586         /* NOTE: We check the lockidtbl here rather than the resource table.
587            This is because there may be LKBs queued as ASTs that have been
588            unlinked from their RSBs and are pending deletion once the AST has
589            been delivered */
590
591         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
592                 read_lock(&ls->ls_lkbtbl[i].lock);
593                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
594                         lkb_found = 1;
595                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
596                                             lkb_idtbl_list) {
597                                 if (!lkb->lkb_nodeid) {
598                                         read_unlock(&ls->ls_lkbtbl[i].lock);
599                                         return 2;
600                                 }
601                         }
602                 }
603                 read_unlock(&ls->ls_lkbtbl[i].lock);
604         }
605         return lkb_found;
606 }
607
608 static int release_lockspace(struct dlm_ls *ls, int force)
609 {
610         struct dlm_lkb *lkb;
611         struct dlm_rsb *rsb;
612         struct list_head *head;
613         int i;
614         int busy = lockspace_busy(ls);
615
616         if (busy > force)
617                 return -EBUSY;
618
619         if (force < 3)
620                 do_uevent(ls, 0);
621
622         dlm_recoverd_stop(ls);
623
624         remove_lockspace(ls);
625
626         dlm_delete_debug_file(ls);
627
628         dlm_astd_suspend();
629
630         kfree(ls->ls_recover_buf);
631
632         /*
633          * Free direntry structs.
634          */
635
636         dlm_dir_clear(ls);
637         kfree(ls->ls_dirtbl);
638
639         /*
640          * Free all lkb's on lkbtbl[] lists.
641          */
642
643         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
644                 head = &ls->ls_lkbtbl[i].list;
645                 while (!list_empty(head)) {
646                         lkb = list_entry(head->next, struct dlm_lkb,
647                                          lkb_idtbl_list);
648
649                         list_del(&lkb->lkb_idtbl_list);
650
651                         dlm_del_ast(lkb);
652
653                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
654                                 free_lvb(lkb->lkb_lvbptr);
655
656                         free_lkb(lkb);
657                 }
658         }
659         dlm_astd_resume();
660
661         kfree(ls->ls_lkbtbl);
662
663         /*
664          * Free all rsb's on rsbtbl[] lists
665          */
666
667         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
668                 head = &ls->ls_rsbtbl[i].list;
669                 while (!list_empty(head)) {
670                         rsb = list_entry(head->next, struct dlm_rsb,
671                                          res_hashchain);
672
673                         list_del(&rsb->res_hashchain);
674                         free_rsb(rsb);
675                 }
676
677                 head = &ls->ls_rsbtbl[i].toss;
678                 while (!list_empty(head)) {
679                         rsb = list_entry(head->next, struct dlm_rsb,
680                                          res_hashchain);
681                         list_del(&rsb->res_hashchain);
682                         free_rsb(rsb);
683                 }
684         }
685
686         kfree(ls->ls_rsbtbl);
687
688         /*
689          * Free structures on any other lists
690          */
691
692         dlm_purge_requestqueue(ls);
693         kfree(ls->ls_recover_args);
694         dlm_clear_free_entries(ls);
695         dlm_clear_members(ls);
696         dlm_clear_members_gone(ls);
697         kfree(ls->ls_node_array);
698         kobject_unregister(&ls->ls_kobj);
699         /* The ls structure will be freed when the kobject is done with */
700
701         mutex_lock(&ls_lock);
702         ls_count--;
703         if (!ls_count)
704                 threads_stop();
705         mutex_unlock(&ls_lock);
706
707         module_put(THIS_MODULE);
708         return 0;
709 }
710
711 /*
712  * Called when a system has released all its locks and is not going to use the
713  * lockspace any longer.  We free everything we're managing for this lockspace.
714  * Remaining nodes will go through the recovery process as if we'd died.  The
715  * lockspace must continue to function as usual, participating in recoveries,
716  * until this returns.
717  *
718  * Force has 4 possible values:
719  * 0 - don't destroy locksapce if it has any LKBs
720  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
721  * 2 - destroy lockspace regardless of LKBs
722  * 3 - destroy lockspace as part of a forced shutdown
723  */
724
725 int dlm_release_lockspace(void *lockspace, int force)
726 {
727         struct dlm_ls *ls;
728
729         ls = dlm_find_lockspace_local(lockspace);
730         if (!ls)
731                 return -EINVAL;
732         dlm_put_lockspace(ls);
733         return release_lockspace(ls, force);
734 }
735