Merge branch 'upstream' of git://ftp.linux-mips.org/pub/scm/upstream-linus
[sfrench/cifs-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset dlm_kset = {
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
201
202         /* dlm_controld will see the uevent, do the necessary group management
203            and then write to sysfs to wake us */
204
205         error = wait_event_interruptible(ls->ls_uevent_wait,
206                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
207
208         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
209
210         if (error)
211                 goto out;
212
213         error = ls->ls_uevent_result;
214  out:
215         if (error)
216                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
217                           error, ls->ls_uevent_result);
218         return error;
219 }
220
221
222 int dlm_lockspace_init(void)
223 {
224         int error;
225
226         ls_count = 0;
227         mutex_init(&ls_lock);
228         INIT_LIST_HEAD(&lslist);
229         spin_lock_init(&lslist_lock);
230
231         kobj_set_kset_s(&dlm_kset, kernel_subsys);
232         error = kset_register(&dlm_kset);
233         if (error)
234                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
235         return error;
236 }
237
238 void dlm_lockspace_exit(void)
239 {
240         kset_unregister(&dlm_kset);
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 list_for_each_entry(ls, &lslist, ls_list) {
249                         if (dlm_lock_recovery_try(ls)) {
250                                 dlm_scan_rsbs(ls);
251                                 dlm_scan_timeout(ls);
252                                 dlm_unlock_recovery(ls);
253                         }
254                 }
255                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
256         }
257         return 0;
258 }
259
260 static int dlm_scand_start(void)
261 {
262         struct task_struct *p;
263         int error = 0;
264
265         p = kthread_run(dlm_scand, NULL, "dlm_scand");
266         if (IS_ERR(p))
267                 error = PTR_ERR(p);
268         else
269                 scand_task = p;
270         return error;
271 }
272
273 static void dlm_scand_stop(void)
274 {
275         kthread_stop(scand_task);
276 }
277
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
279 {
280         struct dlm_ls *ls;
281
282         spin_lock(&lslist_lock);
283
284         list_for_each_entry(ls, &lslist, ls_list) {
285                 if (ls->ls_namelen == namelen &&
286                     memcmp(ls->ls_name, name, namelen) == 0)
287                         goto out;
288         }
289         ls = NULL;
290  out:
291         spin_unlock(&lslist_lock);
292         return ls;
293 }
294
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
296 {
297         struct dlm_ls *ls;
298
299         spin_lock(&lslist_lock);
300
301         list_for_each_entry(ls, &lslist, ls_list) {
302                 if (ls->ls_global_id == id) {
303                         ls->ls_count++;
304                         goto out;
305                 }
306         }
307         ls = NULL;
308  out:
309         spin_unlock(&lslist_lock);
310         return ls;
311 }
312
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
314 {
315         struct dlm_ls *ls;
316
317         spin_lock(&lslist_lock);
318         list_for_each_entry(ls, &lslist, ls_list) {
319                 if (ls->ls_local_handle == lockspace) {
320                         ls->ls_count++;
321                         goto out;
322                 }
323         }
324         ls = NULL;
325  out:
326         spin_unlock(&lslist_lock);
327         return ls;
328 }
329
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
331 {
332         struct dlm_ls *ls;
333
334         spin_lock(&lslist_lock);
335         list_for_each_entry(ls, &lslist, ls_list) {
336                 if (ls->ls_device.minor == minor) {
337                         ls->ls_count++;
338                         goto out;
339                 }
340         }
341         ls = NULL;
342  out:
343         spin_unlock(&lslist_lock);
344         return ls;
345 }
346
347 void dlm_put_lockspace(struct dlm_ls *ls)
348 {
349         spin_lock(&lslist_lock);
350         ls->ls_count--;
351         spin_unlock(&lslist_lock);
352 }
353
354 static void remove_lockspace(struct dlm_ls *ls)
355 {
356         for (;;) {
357                 spin_lock(&lslist_lock);
358                 if (ls->ls_count == 0) {
359                         list_del(&ls->ls_list);
360                         spin_unlock(&lslist_lock);
361                         return;
362                 }
363                 spin_unlock(&lslist_lock);
364                 ssleep(1);
365         }
366 }
367
368 static int threads_start(void)
369 {
370         int error;
371
372         /* Thread which process lock requests for all lockspace's */
373         error = dlm_astd_start();
374         if (error) {
375                 log_print("cannot start dlm_astd thread %d", error);
376                 goto fail;
377         }
378
379         error = dlm_scand_start();
380         if (error) {
381                 log_print("cannot start dlm_scand thread %d", error);
382                 goto astd_fail;
383         }
384
385         /* Thread for sending/receiving messages for all lockspace's */
386         error = dlm_lowcomms_start();
387         if (error) {
388                 log_print("cannot start dlm lowcomms %d", error);
389                 goto scand_fail;
390         }
391
392         return 0;
393
394  scand_fail:
395         dlm_scand_stop();
396  astd_fail:
397         dlm_astd_stop();
398  fail:
399         return error;
400 }
401
402 static void threads_stop(void)
403 {
404         dlm_scand_stop();
405         dlm_lowcomms_stop();
406         dlm_astd_stop();
407 }
408
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410                          uint32_t flags, int lvblen)
411 {
412         struct dlm_ls *ls;
413         int i, size, error = -ENOMEM;
414         int do_unreg = 0;
415
416         if (namelen > DLM_LOCKSPACE_LEN)
417                 return -EINVAL;
418
419         if (!lvblen || (lvblen % 8))
420                 return -EINVAL;
421
422         if (!try_module_get(THIS_MODULE))
423                 return -EINVAL;
424
425         ls = dlm_find_lockspace_name(name, namelen);
426         if (ls) {
427                 *lockspace = ls;
428                 module_put(THIS_MODULE);
429                 return -EEXIST;
430         }
431
432         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
433         if (!ls)
434                 goto out;
435         memcpy(ls->ls_name, name, namelen);
436         ls->ls_namelen = namelen;
437         ls->ls_lvblen = lvblen;
438         ls->ls_count = 0;
439         ls->ls_flags = 0;
440
441         if (flags & DLM_LSFL_TIMEWARN)
442                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
443
444         if (flags & DLM_LSFL_FS)
445                 ls->ls_allocation = GFP_NOFS;
446         else
447                 ls->ls_allocation = GFP_KERNEL;
448
449         /* ls_exflags are forced to match among nodes, and we don't
450            need to require all nodes to have TIMEWARN or FS set */
451         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
452
453         size = dlm_config.ci_rsbtbl_size;
454         ls->ls_rsbtbl_size = size;
455
456         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
457         if (!ls->ls_rsbtbl)
458                 goto out_lsfree;
459         for (i = 0; i < size; i++) {
460                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462                 rwlock_init(&ls->ls_rsbtbl[i].lock);
463         }
464
465         size = dlm_config.ci_lkbtbl_size;
466         ls->ls_lkbtbl_size = size;
467
468         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
469         if (!ls->ls_lkbtbl)
470                 goto out_rsbfree;
471         for (i = 0; i < size; i++) {
472                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
473                 rwlock_init(&ls->ls_lkbtbl[i].lock);
474                 ls->ls_lkbtbl[i].counter = 1;
475         }
476
477         size = dlm_config.ci_dirtbl_size;
478         ls->ls_dirtbl_size = size;
479
480         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
481         if (!ls->ls_dirtbl)
482                 goto out_lkbfree;
483         for (i = 0; i < size; i++) {
484                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
485                 rwlock_init(&ls->ls_dirtbl[i].lock);
486         }
487
488         INIT_LIST_HEAD(&ls->ls_waiters);
489         mutex_init(&ls->ls_waiters_mutex);
490         INIT_LIST_HEAD(&ls->ls_orphans);
491         mutex_init(&ls->ls_orphans_mutex);
492         INIT_LIST_HEAD(&ls->ls_timeout);
493         mutex_init(&ls->ls_timeout_mutex);
494
495         INIT_LIST_HEAD(&ls->ls_nodes);
496         INIT_LIST_HEAD(&ls->ls_nodes_gone);
497         ls->ls_num_nodes = 0;
498         ls->ls_low_nodeid = 0;
499         ls->ls_total_weight = 0;
500         ls->ls_node_array = NULL;
501
502         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
503         ls->ls_stub_rsb.res_ls = ls;
504
505         ls->ls_debug_rsb_dentry = NULL;
506         ls->ls_debug_waiters_dentry = NULL;
507
508         init_waitqueue_head(&ls->ls_uevent_wait);
509         ls->ls_uevent_result = 0;
510         init_completion(&ls->ls_members_done);
511         ls->ls_members_result = -1;
512
513         ls->ls_recoverd_task = NULL;
514         mutex_init(&ls->ls_recoverd_active);
515         spin_lock_init(&ls->ls_recover_lock);
516         spin_lock_init(&ls->ls_rcom_spin);
517         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
518         ls->ls_recover_status = 0;
519         ls->ls_recover_seq = 0;
520         ls->ls_recover_args = NULL;
521         init_rwsem(&ls->ls_in_recovery);
522         INIT_LIST_HEAD(&ls->ls_requestqueue);
523         mutex_init(&ls->ls_requestqueue_mutex);
524         mutex_init(&ls->ls_clear_proc_locks);
525
526         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
527         if (!ls->ls_recover_buf)
528                 goto out_dirfree;
529
530         INIT_LIST_HEAD(&ls->ls_recover_list);
531         spin_lock_init(&ls->ls_recover_list_lock);
532         ls->ls_recover_list_count = 0;
533         ls->ls_local_handle = ls;
534         init_waitqueue_head(&ls->ls_wait_general);
535         INIT_LIST_HEAD(&ls->ls_root_list);
536         init_rwsem(&ls->ls_root_sem);
537
538         down_write(&ls->ls_in_recovery);
539
540         spin_lock(&lslist_lock);
541         list_add(&ls->ls_list, &lslist);
542         spin_unlock(&lslist_lock);
543
544         /* needs to find ls in lslist */
545         error = dlm_recoverd_start(ls);
546         if (error) {
547                 log_error(ls, "can't start dlm_recoverd %d", error);
548                 goto out_delist;
549         }
550
551         error = kobject_setup(ls);
552         if (error)
553                 goto out_stop;
554
555         error = kobject_register(&ls->ls_kobj);
556         if (error)
557                 goto out_stop;
558
559         /* let kobject handle freeing of ls if there's an error */
560         do_unreg = 1;
561
562         /* This uevent triggers dlm_controld in userspace to add us to the
563            group of nodes that are members of this lockspace (managed by the
564            cluster infrastructure.)  Once it's done that, it tells us who the
565            current lockspace members are (via configfs) and then tells the
566            lockspace to start running (via sysfs) in dlm_ls_start(). */
567
568         error = do_uevent(ls, 1);
569         if (error)
570                 goto out_stop;
571
572         wait_for_completion(&ls->ls_members_done);
573         error = ls->ls_members_result;
574         if (error)
575                 goto out_members;
576
577         dlm_create_debug_file(ls);
578
579         log_debug(ls, "join complete");
580
581         *lockspace = ls;
582         return 0;
583
584  out_members:
585         do_uevent(ls, 0);
586         dlm_clear_members(ls);
587         kfree(ls->ls_node_array);
588  out_stop:
589         dlm_recoverd_stop(ls);
590  out_delist:
591         spin_lock(&lslist_lock);
592         list_del(&ls->ls_list);
593         spin_unlock(&lslist_lock);
594         kfree(ls->ls_recover_buf);
595  out_dirfree:
596         kfree(ls->ls_dirtbl);
597  out_lkbfree:
598         kfree(ls->ls_lkbtbl);
599  out_rsbfree:
600         kfree(ls->ls_rsbtbl);
601  out_lsfree:
602         if (do_unreg)
603                 kobject_unregister(&ls->ls_kobj);
604         else
605                 kfree(ls);
606  out:
607         module_put(THIS_MODULE);
608         return error;
609 }
610
611 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
612                       uint32_t flags, int lvblen)
613 {
614         int error = 0;
615
616         mutex_lock(&ls_lock);
617         if (!ls_count)
618                 error = threads_start();
619         if (error)
620                 goto out;
621
622         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
623         if (!error)
624                 ls_count++;
625         else if (!ls_count)
626                 threads_stop();
627  out:
628         mutex_unlock(&ls_lock);
629         return error;
630 }
631
632 /* Return 1 if the lockspace still has active remote locks,
633  *        2 if the lockspace still has active local locks.
634  */
635 static int lockspace_busy(struct dlm_ls *ls)
636 {
637         int i, lkb_found = 0;
638         struct dlm_lkb *lkb;
639
640         /* NOTE: We check the lockidtbl here rather than the resource table.
641            This is because there may be LKBs queued as ASTs that have been
642            unlinked from their RSBs and are pending deletion once the AST has
643            been delivered */
644
645         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
646                 read_lock(&ls->ls_lkbtbl[i].lock);
647                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
648                         lkb_found = 1;
649                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
650                                             lkb_idtbl_list) {
651                                 if (!lkb->lkb_nodeid) {
652                                         read_unlock(&ls->ls_lkbtbl[i].lock);
653                                         return 2;
654                                 }
655                         }
656                 }
657                 read_unlock(&ls->ls_lkbtbl[i].lock);
658         }
659         return lkb_found;
660 }
661
662 static int release_lockspace(struct dlm_ls *ls, int force)
663 {
664         struct dlm_lkb *lkb;
665         struct dlm_rsb *rsb;
666         struct list_head *head;
667         int i;
668         int busy = lockspace_busy(ls);
669
670         if (busy > force)
671                 return -EBUSY;
672
673         if (force < 3)
674                 do_uevent(ls, 0);
675
676         dlm_recoverd_stop(ls);
677
678         remove_lockspace(ls);
679
680         dlm_delete_debug_file(ls);
681
682         dlm_astd_suspend();
683
684         kfree(ls->ls_recover_buf);
685
686         /*
687          * Free direntry structs.
688          */
689
690         dlm_dir_clear(ls);
691         kfree(ls->ls_dirtbl);
692
693         /*
694          * Free all lkb's on lkbtbl[] lists.
695          */
696
697         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
698                 head = &ls->ls_lkbtbl[i].list;
699                 while (!list_empty(head)) {
700                         lkb = list_entry(head->next, struct dlm_lkb,
701                                          lkb_idtbl_list);
702
703                         list_del(&lkb->lkb_idtbl_list);
704
705                         dlm_del_ast(lkb);
706
707                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
708                                 free_lvb(lkb->lkb_lvbptr);
709
710                         free_lkb(lkb);
711                 }
712         }
713         dlm_astd_resume();
714
715         kfree(ls->ls_lkbtbl);
716
717         /*
718          * Free all rsb's on rsbtbl[] lists
719          */
720
721         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
722                 head = &ls->ls_rsbtbl[i].list;
723                 while (!list_empty(head)) {
724                         rsb = list_entry(head->next, struct dlm_rsb,
725                                          res_hashchain);
726
727                         list_del(&rsb->res_hashchain);
728                         free_rsb(rsb);
729                 }
730
731                 head = &ls->ls_rsbtbl[i].toss;
732                 while (!list_empty(head)) {
733                         rsb = list_entry(head->next, struct dlm_rsb,
734                                          res_hashchain);
735                         list_del(&rsb->res_hashchain);
736                         free_rsb(rsb);
737                 }
738         }
739
740         kfree(ls->ls_rsbtbl);
741
742         /*
743          * Free structures on any other lists
744          */
745
746         dlm_purge_requestqueue(ls);
747         kfree(ls->ls_recover_args);
748         dlm_clear_free_entries(ls);
749         dlm_clear_members(ls);
750         dlm_clear_members_gone(ls);
751         kfree(ls->ls_node_array);
752         kobject_unregister(&ls->ls_kobj);
753         /* The ls structure will be freed when the kobject is done with */
754
755         mutex_lock(&ls_lock);
756         ls_count--;
757         if (!ls_count)
758                 threads_stop();
759         mutex_unlock(&ls_lock);
760
761         module_put(THIS_MODULE);
762         return 0;
763 }
764
765 /*
766  * Called when a system has released all its locks and is not going to use the
767  * lockspace any longer.  We free everything we're managing for this lockspace.
768  * Remaining nodes will go through the recovery process as if we'd died.  The
769  * lockspace must continue to function as usual, participating in recoveries,
770  * until this returns.
771  *
772  * Force has 4 possible values:
773  * 0 - don't destroy locksapce if it has any LKBs
774  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
775  * 2 - destroy lockspace regardless of LKBs
776  * 3 - destroy lockspace as part of a forced shutdown
777  */
778
779 int dlm_release_lockspace(void *lockspace, int force)
780 {
781         struct dlm_ls *ls;
782
783         ls = dlm_find_lockspace_local(lockspace);
784         if (!ls)
785                 return -EINVAL;
786         dlm_put_lockspace(ls);
787         return release_lockspace(ls, force);
788 }
789