Merge branches 'acpi-pci', 'acpi-power' and 'acpi-misc'
[sfrench/cifs-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include <linux/module.h>
15
16 #include "dlm_internal.h"
17 #include "lockspace.h"
18 #include "member.h"
19 #include "recoverd.h"
20 #include "dir.h"
21 #include "lowcomms.h"
22 #include "config.h"
23 #include "memory.h"
24 #include "lock.h"
25 #include "recover.h"
26 #include "requestqueue.h"
27 #include "user.h"
28 #include "ast.h"
29
30 static int                      ls_count;
31 static struct mutex             ls_lock;
32 static struct list_head         lslist;
33 static spinlock_t               lslist_lock;
34 static struct task_struct *     scand_task;
35
36
37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 {
39         ssize_t ret = len;
40         int n;
41         int rc = kstrtoint(buf, 0, &n);
42
43         if (rc)
44                 return rc;
45         ls = dlm_find_lockspace_local(ls->ls_local_handle);
46         if (!ls)
47                 return -EINVAL;
48
49         switch (n) {
50         case 0:
51                 dlm_ls_stop(ls);
52                 break;
53         case 1:
54                 dlm_ls_start(ls);
55                 break;
56         default:
57                 ret = -EINVAL;
58         }
59         dlm_put_lockspace(ls);
60         return ret;
61 }
62
63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 {
65         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
66
67         if (rc)
68                 return rc;
69         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
70         wake_up(&ls->ls_uevent_wait);
71         return len;
72 }
73
74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 {
76         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 }
78
79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 {
81         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
82
83         if (rc)
84                 return rc;
85         return len;
86 }
87
88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 {
90         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 }
92
93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 {
95         int val;
96         int rc = kstrtoint(buf, 0, &val);
97
98         if (rc)
99                 return rc;
100         if (val == 1)
101                 set_bit(LSFL_NODIR, &ls->ls_flags);
102         return len;
103 }
104
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107         uint32_t status = dlm_recover_status(ls);
108         return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115
116 struct dlm_attr {
117         struct attribute attr;
118         ssize_t (*show)(struct dlm_ls *, char *);
119         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121
122 static struct dlm_attr dlm_attr_control = {
123         .attr  = {.name = "control", .mode = S_IWUSR},
124         .store = dlm_control_store
125 };
126
127 static struct dlm_attr dlm_attr_event = {
128         .attr  = {.name = "event_done", .mode = S_IWUSR},
129         .store = dlm_event_store
130 };
131
132 static struct dlm_attr dlm_attr_id = {
133         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134         .show  = dlm_id_show,
135         .store = dlm_id_store
136 };
137
138 static struct dlm_attr dlm_attr_nodir = {
139         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140         .show  = dlm_nodir_show,
141         .store = dlm_nodir_store
142 };
143
144 static struct dlm_attr dlm_attr_recover_status = {
145         .attr  = {.name = "recover_status", .mode = S_IRUGO},
146         .show  = dlm_recover_status_show
147 };
148
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151         .show  = dlm_recover_nodeid_show
152 };
153
154 static struct attribute *dlm_attrs[] = {
155         &dlm_attr_control.attr,
156         &dlm_attr_event.attr,
157         &dlm_attr_id.attr,
158         &dlm_attr_nodir.attr,
159         &dlm_attr_recover_status.attr,
160         &dlm_attr_recover_nodeid.attr,
161         NULL,
162 };
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_attrs = dlm_attrs,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         int error;
202
203         if (in)
204                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205         else
206                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207
208         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209
210         /* dlm_controld will see the uevent, do the necessary group management
211            and then write to sysfs to wake us */
212
213         error = wait_event_interruptible(ls->ls_uevent_wait,
214                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215
216         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217
218         if (error)
219                 goto out;
220
221         error = ls->ls_uevent_result;
222  out:
223         if (error)
224                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225                           error, ls->ls_uevent_result);
226         return error;
227 }
228
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230                       struct kobj_uevent_env *env)
231 {
232         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233
234         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235         return 0;
236 }
237
238 static const struct kset_uevent_ops dlm_uevent_ops = {
239         .uevent = dlm_uevent,
240 };
241
242 int __init dlm_lockspace_init(void)
243 {
244         ls_count = 0;
245         mutex_init(&ls_lock);
246         INIT_LIST_HEAD(&lslist);
247         spin_lock_init(&lslist_lock);
248
249         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250         if (!dlm_kset) {
251                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
252                 return -ENOMEM;
253         }
254         return 0;
255 }
256
257 void dlm_lockspace_exit(void)
258 {
259         kset_unregister(dlm_kset);
260 }
261
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264         struct dlm_ls *ls;
265
266         spin_lock(&lslist_lock);
267         list_for_each_entry(ls, &lslist, ls_list) {
268                 if (time_after_eq(jiffies, ls->ls_scan_time +
269                                             dlm_config.ci_scan_secs * HZ)) {
270                         spin_unlock(&lslist_lock);
271                         return ls;
272                 }
273         }
274         spin_unlock(&lslist_lock);
275         return NULL;
276 }
277
278 static int dlm_scand(void *data)
279 {
280         struct dlm_ls *ls;
281
282         while (!kthread_should_stop()) {
283                 ls = find_ls_to_scan();
284                 if (ls) {
285                         if (dlm_lock_recovery_try(ls)) {
286                                 ls->ls_scan_time = jiffies;
287                                 dlm_scan_rsbs(ls);
288                                 dlm_scan_timeout(ls);
289                                 dlm_scan_waiters(ls);
290                                 dlm_unlock_recovery(ls);
291                         } else {
292                                 ls->ls_scan_time += HZ;
293                         }
294                         continue;
295                 }
296                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297         }
298         return 0;
299 }
300
301 static int dlm_scand_start(void)
302 {
303         struct task_struct *p;
304         int error = 0;
305
306         p = kthread_run(dlm_scand, NULL, "dlm_scand");
307         if (IS_ERR(p))
308                 error = PTR_ERR(p);
309         else
310                 scand_task = p;
311         return error;
312 }
313
314 static void dlm_scand_stop(void)
315 {
316         kthread_stop(scand_task);
317 }
318
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321         struct dlm_ls *ls;
322
323         spin_lock(&lslist_lock);
324
325         list_for_each_entry(ls, &lslist, ls_list) {
326                 if (ls->ls_global_id == id) {
327                         ls->ls_count++;
328                         goto out;
329                 }
330         }
331         ls = NULL;
332  out:
333         spin_unlock(&lslist_lock);
334         return ls;
335 }
336
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339         struct dlm_ls *ls;
340
341         spin_lock(&lslist_lock);
342         list_for_each_entry(ls, &lslist, ls_list) {
343                 if (ls->ls_local_handle == lockspace) {
344                         ls->ls_count++;
345                         goto out;
346                 }
347         }
348         ls = NULL;
349  out:
350         spin_unlock(&lslist_lock);
351         return ls;
352 }
353
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356         struct dlm_ls *ls;
357
358         spin_lock(&lslist_lock);
359         list_for_each_entry(ls, &lslist, ls_list) {
360                 if (ls->ls_device.minor == minor) {
361                         ls->ls_count++;
362                         goto out;
363                 }
364         }
365         ls = NULL;
366  out:
367         spin_unlock(&lslist_lock);
368         return ls;
369 }
370
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373         spin_lock(&lslist_lock);
374         ls->ls_count--;
375         spin_unlock(&lslist_lock);
376 }
377
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380         for (;;) {
381                 spin_lock(&lslist_lock);
382                 if (ls->ls_count == 0) {
383                         WARN_ON(ls->ls_create_count != 0);
384                         list_del(&ls->ls_list);
385                         spin_unlock(&lslist_lock);
386                         return;
387                 }
388                 spin_unlock(&lslist_lock);
389                 ssleep(1);
390         }
391 }
392
393 static int threads_start(void)
394 {
395         int error;
396
397         error = dlm_scand_start();
398         if (error) {
399                 log_print("cannot start dlm_scand thread %d", error);
400                 goto fail;
401         }
402
403         /* Thread for sending/receiving messages for all lockspace's */
404         error = dlm_lowcomms_start();
405         if (error) {
406                 log_print("cannot start dlm lowcomms %d", error);
407                 goto scand_fail;
408         }
409
410         return 0;
411
412  scand_fail:
413         dlm_scand_stop();
414  fail:
415         return error;
416 }
417
418 static void threads_stop(void)
419 {
420         dlm_scand_stop();
421         dlm_lowcomms_stop();
422 }
423
424 static int new_lockspace(const char *name, const char *cluster,
425                          uint32_t flags, int lvblen,
426                          const struct dlm_lockspace_ops *ops, void *ops_arg,
427                          int *ops_result, dlm_lockspace_t **lockspace)
428 {
429         struct dlm_ls *ls;
430         int i, size, error;
431         int do_unreg = 0;
432         int namelen = strlen(name);
433
434         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
435                 return -EINVAL;
436
437         if (!lvblen || (lvblen % 8))
438                 return -EINVAL;
439
440         if (!try_module_get(THIS_MODULE))
441                 return -EINVAL;
442
443         if (!dlm_user_daemon_available()) {
444                 log_print("dlm user daemon not available");
445                 error = -EUNATCH;
446                 goto out;
447         }
448
449         if (ops && ops_result) {
450                 if (!dlm_config.ci_recover_callbacks)
451                         *ops_result = -EOPNOTSUPP;
452                 else
453                         *ops_result = 0;
454         }
455
456         if (!cluster)
457                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
458                           dlm_config.ci_cluster_name);
459
460         if (dlm_config.ci_recover_callbacks && cluster &&
461             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
462                 log_print("dlm cluster name '%s' does not match "
463                           "the application cluster name '%s'",
464                           dlm_config.ci_cluster_name, cluster);
465                 error = -EBADR;
466                 goto out;
467         }
468
469         error = 0;
470
471         spin_lock(&lslist_lock);
472         list_for_each_entry(ls, &lslist, ls_list) {
473                 WARN_ON(ls->ls_create_count <= 0);
474                 if (ls->ls_namelen != namelen)
475                         continue;
476                 if (memcmp(ls->ls_name, name, namelen))
477                         continue;
478                 if (flags & DLM_LSFL_NEWEXCL) {
479                         error = -EEXIST;
480                         break;
481                 }
482                 ls->ls_create_count++;
483                 *lockspace = ls;
484                 error = 1;
485                 break;
486         }
487         spin_unlock(&lslist_lock);
488
489         if (error)
490                 goto out;
491
492         error = -ENOMEM;
493
494         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
495         if (!ls)
496                 goto out;
497         memcpy(ls->ls_name, name, namelen);
498         ls->ls_namelen = namelen;
499         ls->ls_lvblen = lvblen;
500         ls->ls_count = 0;
501         ls->ls_flags = 0;
502         ls->ls_scan_time = jiffies;
503
504         if (ops && dlm_config.ci_recover_callbacks) {
505                 ls->ls_ops = ops;
506                 ls->ls_ops_arg = ops_arg;
507         }
508
509         if (flags & DLM_LSFL_TIMEWARN)
510                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
511
512         /* ls_exflags are forced to match among nodes, and we don't
513            need to require all nodes to have some flags set */
514         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
515                                     DLM_LSFL_NEWEXCL));
516
517         size = dlm_config.ci_rsbtbl_size;
518         ls->ls_rsbtbl_size = size;
519
520         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
521         if (!ls->ls_rsbtbl)
522                 goto out_lsfree;
523         for (i = 0; i < size; i++) {
524                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
525                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
526                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
527         }
528
529         spin_lock_init(&ls->ls_remove_spin);
530
531         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
532                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
533                                                  GFP_KERNEL);
534                 if (!ls->ls_remove_names[i])
535                         goto out_rsbtbl;
536         }
537
538         idr_init(&ls->ls_lkbidr);
539         spin_lock_init(&ls->ls_lkbidr_spin);
540
541         INIT_LIST_HEAD(&ls->ls_waiters);
542         mutex_init(&ls->ls_waiters_mutex);
543         INIT_LIST_HEAD(&ls->ls_orphans);
544         mutex_init(&ls->ls_orphans_mutex);
545         INIT_LIST_HEAD(&ls->ls_timeout);
546         mutex_init(&ls->ls_timeout_mutex);
547
548         INIT_LIST_HEAD(&ls->ls_new_rsb);
549         spin_lock_init(&ls->ls_new_rsb_spin);
550
551         INIT_LIST_HEAD(&ls->ls_nodes);
552         INIT_LIST_HEAD(&ls->ls_nodes_gone);
553         ls->ls_num_nodes = 0;
554         ls->ls_low_nodeid = 0;
555         ls->ls_total_weight = 0;
556         ls->ls_node_array = NULL;
557
558         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559         ls->ls_stub_rsb.res_ls = ls;
560
561         ls->ls_debug_rsb_dentry = NULL;
562         ls->ls_debug_waiters_dentry = NULL;
563
564         init_waitqueue_head(&ls->ls_uevent_wait);
565         ls->ls_uevent_result = 0;
566         init_completion(&ls->ls_members_done);
567         ls->ls_members_result = -1;
568
569         mutex_init(&ls->ls_cb_mutex);
570         INIT_LIST_HEAD(&ls->ls_cb_delay);
571
572         ls->ls_recoverd_task = NULL;
573         mutex_init(&ls->ls_recoverd_active);
574         spin_lock_init(&ls->ls_recover_lock);
575         spin_lock_init(&ls->ls_rcom_spin);
576         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577         ls->ls_recover_status = 0;
578         ls->ls_recover_seq = 0;
579         ls->ls_recover_args = NULL;
580         init_rwsem(&ls->ls_in_recovery);
581         init_rwsem(&ls->ls_recv_active);
582         INIT_LIST_HEAD(&ls->ls_requestqueue);
583         mutex_init(&ls->ls_requestqueue_mutex);
584         mutex_init(&ls->ls_clear_proc_locks);
585
586         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
587         if (!ls->ls_recover_buf)
588                 goto out_lkbidr;
589
590         ls->ls_slot = 0;
591         ls->ls_num_slots = 0;
592         ls->ls_slots_size = 0;
593         ls->ls_slots = NULL;
594
595         INIT_LIST_HEAD(&ls->ls_recover_list);
596         spin_lock_init(&ls->ls_recover_list_lock);
597         idr_init(&ls->ls_recover_idr);
598         spin_lock_init(&ls->ls_recover_idr_lock);
599         ls->ls_recover_list_count = 0;
600         ls->ls_local_handle = ls;
601         init_waitqueue_head(&ls->ls_wait_general);
602         INIT_LIST_HEAD(&ls->ls_root_list);
603         init_rwsem(&ls->ls_root_sem);
604
605         spin_lock(&lslist_lock);
606         ls->ls_create_count = 1;
607         list_add(&ls->ls_list, &lslist);
608         spin_unlock(&lslist_lock);
609
610         if (flags & DLM_LSFL_FS) {
611                 error = dlm_callback_start(ls);
612                 if (error) {
613                         log_error(ls, "can't start dlm_callback %d", error);
614                         goto out_delist;
615                 }
616         }
617
618         init_waitqueue_head(&ls->ls_recover_lock_wait);
619
620         /*
621          * Once started, dlm_recoverd first looks for ls in lslist, then
622          * initializes ls_in_recovery as locked in "down" mode.  We need
623          * to wait for the wakeup from dlm_recoverd because in_recovery
624          * has to start out in down mode.
625          */
626
627         error = dlm_recoverd_start(ls);
628         if (error) {
629                 log_error(ls, "can't start dlm_recoverd %d", error);
630                 goto out_callback;
631         }
632
633         wait_event(ls->ls_recover_lock_wait,
634                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
635
636         ls->ls_kobj.kset = dlm_kset;
637         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
638                                      "%s", ls->ls_name);
639         if (error)
640                 goto out_recoverd;
641         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
642
643         /* let kobject handle freeing of ls if there's an error */
644         do_unreg = 1;
645
646         /* This uevent triggers dlm_controld in userspace to add us to the
647            group of nodes that are members of this lockspace (managed by the
648            cluster infrastructure.)  Once it's done that, it tells us who the
649            current lockspace members are (via configfs) and then tells the
650            lockspace to start running (via sysfs) in dlm_ls_start(). */
651
652         error = do_uevent(ls, 1);
653         if (error)
654                 goto out_recoverd;
655
656         wait_for_completion(&ls->ls_members_done);
657         error = ls->ls_members_result;
658         if (error)
659                 goto out_members;
660
661         dlm_create_debug_file(ls);
662
663         log_rinfo(ls, "join complete");
664         *lockspace = ls;
665         return 0;
666
667  out_members:
668         do_uevent(ls, 0);
669         dlm_clear_members(ls);
670         kfree(ls->ls_node_array);
671  out_recoverd:
672         dlm_recoverd_stop(ls);
673  out_callback:
674         dlm_callback_stop(ls);
675  out_delist:
676         spin_lock(&lslist_lock);
677         list_del(&ls->ls_list);
678         spin_unlock(&lslist_lock);
679         idr_destroy(&ls->ls_recover_idr);
680         kfree(ls->ls_recover_buf);
681  out_lkbidr:
682         idr_destroy(&ls->ls_lkbidr);
683  out_rsbtbl:
684         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
685                 kfree(ls->ls_remove_names[i]);
686         vfree(ls->ls_rsbtbl);
687  out_lsfree:
688         if (do_unreg)
689                 kobject_put(&ls->ls_kobj);
690         else
691                 kfree(ls);
692  out:
693         module_put(THIS_MODULE);
694         return error;
695 }
696
697 int dlm_new_lockspace(const char *name, const char *cluster,
698                       uint32_t flags, int lvblen,
699                       const struct dlm_lockspace_ops *ops, void *ops_arg,
700                       int *ops_result, dlm_lockspace_t **lockspace)
701 {
702         int error = 0;
703
704         mutex_lock(&ls_lock);
705         if (!ls_count)
706                 error = threads_start();
707         if (error)
708                 goto out;
709
710         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
711                               ops_result, lockspace);
712         if (!error)
713                 ls_count++;
714         if (error > 0)
715                 error = 0;
716         if (!ls_count)
717                 threads_stop();
718  out:
719         mutex_unlock(&ls_lock);
720         return error;
721 }
722
723 static int lkb_idr_is_local(int id, void *p, void *data)
724 {
725         struct dlm_lkb *lkb = p;
726
727         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
728 }
729
730 static int lkb_idr_is_any(int id, void *p, void *data)
731 {
732         return 1;
733 }
734
735 static int lkb_idr_free(int id, void *p, void *data)
736 {
737         struct dlm_lkb *lkb = p;
738
739         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
740                 dlm_free_lvb(lkb->lkb_lvbptr);
741
742         dlm_free_lkb(lkb);
743         return 0;
744 }
745
746 /* NOTE: We check the lkbidr here rather than the resource table.
747    This is because there may be LKBs queued as ASTs that have been unlinked
748    from their RSBs and are pending deletion once the AST has been delivered */
749
750 static int lockspace_busy(struct dlm_ls *ls, int force)
751 {
752         int rv;
753
754         spin_lock(&ls->ls_lkbidr_spin);
755         if (force == 0) {
756                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
757         } else if (force == 1) {
758                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
759         } else {
760                 rv = 0;
761         }
762         spin_unlock(&ls->ls_lkbidr_spin);
763         return rv;
764 }
765
766 static int release_lockspace(struct dlm_ls *ls, int force)
767 {
768         struct dlm_rsb *rsb;
769         struct rb_node *n;
770         int i, busy, rv;
771
772         busy = lockspace_busy(ls, force);
773
774         spin_lock(&lslist_lock);
775         if (ls->ls_create_count == 1) {
776                 if (busy) {
777                         rv = -EBUSY;
778                 } else {
779                         /* remove_lockspace takes ls off lslist */
780                         ls->ls_create_count = 0;
781                         rv = 0;
782                 }
783         } else if (ls->ls_create_count > 1) {
784                 rv = --ls->ls_create_count;
785         } else {
786                 rv = -EINVAL;
787         }
788         spin_unlock(&lslist_lock);
789
790         if (rv) {
791                 log_debug(ls, "release_lockspace no remove %d", rv);
792                 return rv;
793         }
794
795         dlm_device_deregister(ls);
796
797         if (force < 3 && dlm_user_daemon_available())
798                 do_uevent(ls, 0);
799
800         dlm_recoverd_stop(ls);
801
802         dlm_callback_stop(ls);
803
804         remove_lockspace(ls);
805
806         dlm_delete_debug_file(ls);
807
808         idr_destroy(&ls->ls_recover_idr);
809         kfree(ls->ls_recover_buf);
810
811         /*
812          * Free all lkb's in idr
813          */
814
815         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
816         idr_destroy(&ls->ls_lkbidr);
817
818         /*
819          * Free all rsb's on rsbtbl[] lists
820          */
821
822         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
823                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
824                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
825                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
826                         dlm_free_rsb(rsb);
827                 }
828
829                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
830                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
831                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
832                         dlm_free_rsb(rsb);
833                 }
834         }
835
836         vfree(ls->ls_rsbtbl);
837
838         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
839                 kfree(ls->ls_remove_names[i]);
840
841         while (!list_empty(&ls->ls_new_rsb)) {
842                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
843                                        res_hashchain);
844                 list_del(&rsb->res_hashchain);
845                 dlm_free_rsb(rsb);
846         }
847
848         /*
849          * Free structures on any other lists
850          */
851
852         dlm_purge_requestqueue(ls);
853         kfree(ls->ls_recover_args);
854         dlm_clear_members(ls);
855         dlm_clear_members_gone(ls);
856         kfree(ls->ls_node_array);
857         log_rinfo(ls, "release_lockspace final free");
858         kobject_put(&ls->ls_kobj);
859         /* The ls structure will be freed when the kobject is done with */
860
861         module_put(THIS_MODULE);
862         return 0;
863 }
864
865 /*
866  * Called when a system has released all its locks and is not going to use the
867  * lockspace any longer.  We free everything we're managing for this lockspace.
868  * Remaining nodes will go through the recovery process as if we'd died.  The
869  * lockspace must continue to function as usual, participating in recoveries,
870  * until this returns.
871  *
872  * Force has 4 possible values:
873  * 0 - don't destroy locksapce if it has any LKBs
874  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
875  * 2 - destroy lockspace regardless of LKBs
876  * 3 - destroy lockspace as part of a forced shutdown
877  */
878
879 int dlm_release_lockspace(void *lockspace, int force)
880 {
881         struct dlm_ls *ls;
882         int error;
883
884         ls = dlm_find_lockspace_local(lockspace);
885         if (!ls)
886                 return -EINVAL;
887         dlm_put_lockspace(ls);
888
889         mutex_lock(&ls_lock);
890         error = release_lockspace(ls, force);
891         if (!error)
892                 ls_count--;
893         if (!ls_count)
894                 threads_stop();
895         mutex_unlock(&ls_lock);
896
897         return error;
898 }
899
900 void dlm_stop_lockspaces(void)
901 {
902         struct dlm_ls *ls;
903         int count;
904
905  restart:
906         count = 0;
907         spin_lock(&lslist_lock);
908         list_for_each_entry(ls, &lslist, ls_list) {
909                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
910                         count++;
911                         continue;
912                 }
913                 spin_unlock(&lslist_lock);
914                 log_error(ls, "no userland control daemon, stopping lockspace");
915                 dlm_ls_stop(ls);
916                 goto restart;
917         }
918         spin_unlock(&lslist_lock);
919
920         if (count)
921                 log_print("dlm user daemon left %d lockspaces", count);
922 }
923