treewide: Use array_size() in vmalloc()
[sfrench/cifs-2.6.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include <linux/module.h>
15
16 #include "dlm_internal.h"
17 #include "lockspace.h"
18 #include "member.h"
19 #include "recoverd.h"
20 #include "dir.h"
21 #include "lowcomms.h"
22 #include "config.h"
23 #include "memory.h"
24 #include "lock.h"
25 #include "recover.h"
26 #include "requestqueue.h"
27 #include "user.h"
28 #include "ast.h"
29
30 static int                      ls_count;
31 static struct mutex             ls_lock;
32 static struct list_head         lslist;
33 static spinlock_t               lslist_lock;
34 static struct task_struct *     scand_task;
35
36
37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 {
39         ssize_t ret = len;
40         int n;
41         int rc = kstrtoint(buf, 0, &n);
42
43         if (rc)
44                 return rc;
45         ls = dlm_find_lockspace_local(ls->ls_local_handle);
46         if (!ls)
47                 return -EINVAL;
48
49         switch (n) {
50         case 0:
51                 dlm_ls_stop(ls);
52                 break;
53         case 1:
54                 dlm_ls_start(ls);
55                 break;
56         default:
57                 ret = -EINVAL;
58         }
59         dlm_put_lockspace(ls);
60         return ret;
61 }
62
63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 {
65         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
66
67         if (rc)
68                 return rc;
69         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
70         wake_up(&ls->ls_uevent_wait);
71         return len;
72 }
73
74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 {
76         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 }
78
79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 {
81         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
82
83         if (rc)
84                 return rc;
85         return len;
86 }
87
88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 {
90         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 }
92
93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 {
95         int val;
96         int rc = kstrtoint(buf, 0, &val);
97
98         if (rc)
99                 return rc;
100         if (val == 1)
101                 set_bit(LSFL_NODIR, &ls->ls_flags);
102         return len;
103 }
104
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107         uint32_t status = dlm_recover_status(ls);
108         return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115
116 struct dlm_attr {
117         struct attribute attr;
118         ssize_t (*show)(struct dlm_ls *, char *);
119         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121
122 static struct dlm_attr dlm_attr_control = {
123         .attr  = {.name = "control", .mode = S_IWUSR},
124         .store = dlm_control_store
125 };
126
127 static struct dlm_attr dlm_attr_event = {
128         .attr  = {.name = "event_done", .mode = S_IWUSR},
129         .store = dlm_event_store
130 };
131
132 static struct dlm_attr dlm_attr_id = {
133         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134         .show  = dlm_id_show,
135         .store = dlm_id_store
136 };
137
138 static struct dlm_attr dlm_attr_nodir = {
139         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140         .show  = dlm_nodir_show,
141         .store = dlm_nodir_store
142 };
143
144 static struct dlm_attr dlm_attr_recover_status = {
145         .attr  = {.name = "recover_status", .mode = S_IRUGO},
146         .show  = dlm_recover_status_show
147 };
148
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151         .show  = dlm_recover_nodeid_show
152 };
153
154 static struct attribute *dlm_attrs[] = {
155         &dlm_attr_control.attr,
156         &dlm_attr_event.attr,
157         &dlm_attr_id.attr,
158         &dlm_attr_nodir.attr,
159         &dlm_attr_recover_status.attr,
160         &dlm_attr_recover_nodeid.attr,
161         NULL,
162 };
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_attrs = dlm_attrs,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         int error;
202
203         if (in)
204                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205         else
206                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207
208         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209
210         /* dlm_controld will see the uevent, do the necessary group management
211            and then write to sysfs to wake us */
212
213         error = wait_event_interruptible(ls->ls_uevent_wait,
214                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215
216         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217
218         if (error)
219                 goto out;
220
221         error = ls->ls_uevent_result;
222  out:
223         if (error)
224                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225                           error, ls->ls_uevent_result);
226         return error;
227 }
228
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230                       struct kobj_uevent_env *env)
231 {
232         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233
234         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235         return 0;
236 }
237
238 static const struct kset_uevent_ops dlm_uevent_ops = {
239         .uevent = dlm_uevent,
240 };
241
242 int __init dlm_lockspace_init(void)
243 {
244         ls_count = 0;
245         mutex_init(&ls_lock);
246         INIT_LIST_HEAD(&lslist);
247         spin_lock_init(&lslist_lock);
248
249         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250         if (!dlm_kset) {
251                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
252                 return -ENOMEM;
253         }
254         return 0;
255 }
256
257 void dlm_lockspace_exit(void)
258 {
259         kset_unregister(dlm_kset);
260 }
261
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264         struct dlm_ls *ls;
265
266         spin_lock(&lslist_lock);
267         list_for_each_entry(ls, &lslist, ls_list) {
268                 if (time_after_eq(jiffies, ls->ls_scan_time +
269                                             dlm_config.ci_scan_secs * HZ)) {
270                         spin_unlock(&lslist_lock);
271                         return ls;
272                 }
273         }
274         spin_unlock(&lslist_lock);
275         return NULL;
276 }
277
278 static int dlm_scand(void *data)
279 {
280         struct dlm_ls *ls;
281
282         while (!kthread_should_stop()) {
283                 ls = find_ls_to_scan();
284                 if (ls) {
285                         if (dlm_lock_recovery_try(ls)) {
286                                 ls->ls_scan_time = jiffies;
287                                 dlm_scan_rsbs(ls);
288                                 dlm_scan_timeout(ls);
289                                 dlm_scan_waiters(ls);
290                                 dlm_unlock_recovery(ls);
291                         } else {
292                                 ls->ls_scan_time += HZ;
293                         }
294                         continue;
295                 }
296                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297         }
298         return 0;
299 }
300
301 static int dlm_scand_start(void)
302 {
303         struct task_struct *p;
304         int error = 0;
305
306         p = kthread_run(dlm_scand, NULL, "dlm_scand");
307         if (IS_ERR(p))
308                 error = PTR_ERR(p);
309         else
310                 scand_task = p;
311         return error;
312 }
313
314 static void dlm_scand_stop(void)
315 {
316         kthread_stop(scand_task);
317 }
318
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321         struct dlm_ls *ls;
322
323         spin_lock(&lslist_lock);
324
325         list_for_each_entry(ls, &lslist, ls_list) {
326                 if (ls->ls_global_id == id) {
327                         ls->ls_count++;
328                         goto out;
329                 }
330         }
331         ls = NULL;
332  out:
333         spin_unlock(&lslist_lock);
334         return ls;
335 }
336
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339         struct dlm_ls *ls;
340
341         spin_lock(&lslist_lock);
342         list_for_each_entry(ls, &lslist, ls_list) {
343                 if (ls->ls_local_handle == lockspace) {
344                         ls->ls_count++;
345                         goto out;
346                 }
347         }
348         ls = NULL;
349  out:
350         spin_unlock(&lslist_lock);
351         return ls;
352 }
353
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356         struct dlm_ls *ls;
357
358         spin_lock(&lslist_lock);
359         list_for_each_entry(ls, &lslist, ls_list) {
360                 if (ls->ls_device.minor == minor) {
361                         ls->ls_count++;
362                         goto out;
363                 }
364         }
365         ls = NULL;
366  out:
367         spin_unlock(&lslist_lock);
368         return ls;
369 }
370
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373         spin_lock(&lslist_lock);
374         ls->ls_count--;
375         spin_unlock(&lslist_lock);
376 }
377
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380         for (;;) {
381                 spin_lock(&lslist_lock);
382                 if (ls->ls_count == 0) {
383                         WARN_ON(ls->ls_create_count != 0);
384                         list_del(&ls->ls_list);
385                         spin_unlock(&lslist_lock);
386                         return;
387                 }
388                 spin_unlock(&lslist_lock);
389                 ssleep(1);
390         }
391 }
392
393 static int threads_start(void)
394 {
395         int error;
396
397         error = dlm_scand_start();
398         if (error) {
399                 log_print("cannot start dlm_scand thread %d", error);
400                 goto fail;
401         }
402
403         /* Thread for sending/receiving messages for all lockspace's */
404         error = dlm_lowcomms_start();
405         if (error) {
406                 log_print("cannot start dlm lowcomms %d", error);
407                 goto scand_fail;
408         }
409
410         return 0;
411
412  scand_fail:
413         dlm_scand_stop();
414  fail:
415         return error;
416 }
417
418 static void threads_stop(void)
419 {
420         dlm_scand_stop();
421         dlm_lowcomms_stop();
422 }
423
424 static int new_lockspace(const char *name, const char *cluster,
425                          uint32_t flags, int lvblen,
426                          const struct dlm_lockspace_ops *ops, void *ops_arg,
427                          int *ops_result, dlm_lockspace_t **lockspace)
428 {
429         struct dlm_ls *ls;
430         int i, size, error;
431         int do_unreg = 0;
432         int namelen = strlen(name);
433
434         if (namelen > DLM_LOCKSPACE_LEN)
435                 return -EINVAL;
436
437         if (!lvblen || (lvblen % 8))
438                 return -EINVAL;
439
440         if (!try_module_get(THIS_MODULE))
441                 return -EINVAL;
442
443         if (!dlm_user_daemon_available()) {
444                 log_print("dlm user daemon not available");
445                 error = -EUNATCH;
446                 goto out;
447         }
448
449         if (ops && ops_result) {
450                 if (!dlm_config.ci_recover_callbacks)
451                         *ops_result = -EOPNOTSUPP;
452                 else
453                         *ops_result = 0;
454         }
455
456         if (!cluster)
457                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
458                           dlm_config.ci_cluster_name);
459
460         if (dlm_config.ci_recover_callbacks && cluster &&
461             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
462                 log_print("dlm cluster name '%s' does not match "
463                           "the application cluster name '%s'",
464                           dlm_config.ci_cluster_name, cluster);
465                 error = -EBADR;
466                 goto out;
467         }
468
469         error = 0;
470
471         spin_lock(&lslist_lock);
472         list_for_each_entry(ls, &lslist, ls_list) {
473                 WARN_ON(ls->ls_create_count <= 0);
474                 if (ls->ls_namelen != namelen)
475                         continue;
476                 if (memcmp(ls->ls_name, name, namelen))
477                         continue;
478                 if (flags & DLM_LSFL_NEWEXCL) {
479                         error = -EEXIST;
480                         break;
481                 }
482                 ls->ls_create_count++;
483                 *lockspace = ls;
484                 error = 1;
485                 break;
486         }
487         spin_unlock(&lslist_lock);
488
489         if (error)
490                 goto out;
491
492         error = -ENOMEM;
493
494         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
495         if (!ls)
496                 goto out;
497         memcpy(ls->ls_name, name, namelen);
498         ls->ls_namelen = namelen;
499         ls->ls_lvblen = lvblen;
500         ls->ls_count = 0;
501         ls->ls_flags = 0;
502         ls->ls_scan_time = jiffies;
503
504         if (ops && dlm_config.ci_recover_callbacks) {
505                 ls->ls_ops = ops;
506                 ls->ls_ops_arg = ops_arg;
507         }
508
509         if (flags & DLM_LSFL_TIMEWARN)
510                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
511
512         /* ls_exflags are forced to match among nodes, and we don't
513            need to require all nodes to have some flags set */
514         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
515                                     DLM_LSFL_NEWEXCL));
516
517         size = dlm_config.ci_rsbtbl_size;
518         ls->ls_rsbtbl_size = size;
519
520         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
521         if (!ls->ls_rsbtbl)
522                 goto out_lsfree;
523         for (i = 0; i < size; i++) {
524                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
525                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
526                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
527         }
528
529         spin_lock_init(&ls->ls_remove_spin);
530
531         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
532                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
533                                                  GFP_KERNEL);
534                 if (!ls->ls_remove_names[i])
535                         goto out_rsbtbl;
536         }
537
538         idr_init(&ls->ls_lkbidr);
539         spin_lock_init(&ls->ls_lkbidr_spin);
540
541         INIT_LIST_HEAD(&ls->ls_waiters);
542         mutex_init(&ls->ls_waiters_mutex);
543         INIT_LIST_HEAD(&ls->ls_orphans);
544         mutex_init(&ls->ls_orphans_mutex);
545         INIT_LIST_HEAD(&ls->ls_timeout);
546         mutex_init(&ls->ls_timeout_mutex);
547
548         INIT_LIST_HEAD(&ls->ls_new_rsb);
549         spin_lock_init(&ls->ls_new_rsb_spin);
550
551         INIT_LIST_HEAD(&ls->ls_nodes);
552         INIT_LIST_HEAD(&ls->ls_nodes_gone);
553         ls->ls_num_nodes = 0;
554         ls->ls_low_nodeid = 0;
555         ls->ls_total_weight = 0;
556         ls->ls_node_array = NULL;
557
558         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559         ls->ls_stub_rsb.res_ls = ls;
560
561         ls->ls_debug_rsb_dentry = NULL;
562         ls->ls_debug_waiters_dentry = NULL;
563
564         init_waitqueue_head(&ls->ls_uevent_wait);
565         ls->ls_uevent_result = 0;
566         init_completion(&ls->ls_members_done);
567         ls->ls_members_result = -1;
568
569         mutex_init(&ls->ls_cb_mutex);
570         INIT_LIST_HEAD(&ls->ls_cb_delay);
571
572         ls->ls_recoverd_task = NULL;
573         mutex_init(&ls->ls_recoverd_active);
574         spin_lock_init(&ls->ls_recover_lock);
575         spin_lock_init(&ls->ls_rcom_spin);
576         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577         ls->ls_recover_status = 0;
578         ls->ls_recover_seq = 0;
579         ls->ls_recover_args = NULL;
580         init_rwsem(&ls->ls_in_recovery);
581         init_rwsem(&ls->ls_recv_active);
582         INIT_LIST_HEAD(&ls->ls_requestqueue);
583         mutex_init(&ls->ls_requestqueue_mutex);
584         mutex_init(&ls->ls_clear_proc_locks);
585
586         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
587         if (!ls->ls_recover_buf)
588                 goto out_lkbidr;
589
590         ls->ls_slot = 0;
591         ls->ls_num_slots = 0;
592         ls->ls_slots_size = 0;
593         ls->ls_slots = NULL;
594
595         INIT_LIST_HEAD(&ls->ls_recover_list);
596         spin_lock_init(&ls->ls_recover_list_lock);
597         idr_init(&ls->ls_recover_idr);
598         spin_lock_init(&ls->ls_recover_idr_lock);
599         ls->ls_recover_list_count = 0;
600         ls->ls_local_handle = ls;
601         init_waitqueue_head(&ls->ls_wait_general);
602         INIT_LIST_HEAD(&ls->ls_root_list);
603         init_rwsem(&ls->ls_root_sem);
604
605         spin_lock(&lslist_lock);
606         ls->ls_create_count = 1;
607         list_add(&ls->ls_list, &lslist);
608         spin_unlock(&lslist_lock);
609
610         if (flags & DLM_LSFL_FS) {
611                 error = dlm_callback_start(ls);
612                 if (error) {
613                         log_error(ls, "can't start dlm_callback %d", error);
614                         goto out_delist;
615                 }
616         }
617
618         init_waitqueue_head(&ls->ls_recover_lock_wait);
619
620         /*
621          * Once started, dlm_recoverd first looks for ls in lslist, then
622          * initializes ls_in_recovery as locked in "down" mode.  We need
623          * to wait for the wakeup from dlm_recoverd because in_recovery
624          * has to start out in down mode.
625          */
626
627         error = dlm_recoverd_start(ls);
628         if (error) {
629                 log_error(ls, "can't start dlm_recoverd %d", error);
630                 goto out_callback;
631         }
632
633         wait_event(ls->ls_recover_lock_wait,
634                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
635
636         ls->ls_kobj.kset = dlm_kset;
637         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
638                                      "%s", ls->ls_name);
639         if (error)
640                 goto out_recoverd;
641         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
642
643         /* let kobject handle freeing of ls if there's an error */
644         do_unreg = 1;
645
646         /* This uevent triggers dlm_controld in userspace to add us to the
647            group of nodes that are members of this lockspace (managed by the
648            cluster infrastructure.)  Once it's done that, it tells us who the
649            current lockspace members are (via configfs) and then tells the
650            lockspace to start running (via sysfs) in dlm_ls_start(). */
651
652         error = do_uevent(ls, 1);
653         if (error)
654                 goto out_recoverd;
655
656         wait_for_completion(&ls->ls_members_done);
657         error = ls->ls_members_result;
658         if (error)
659                 goto out_members;
660
661         dlm_create_debug_file(ls);
662
663         log_rinfo(ls, "join complete");
664         *lockspace = ls;
665         return 0;
666
667  out_members:
668         do_uevent(ls, 0);
669         dlm_clear_members(ls);
670         kfree(ls->ls_node_array);
671  out_recoverd:
672         dlm_recoverd_stop(ls);
673  out_callback:
674         dlm_callback_stop(ls);
675  out_delist:
676         spin_lock(&lslist_lock);
677         list_del(&ls->ls_list);
678         spin_unlock(&lslist_lock);
679         idr_destroy(&ls->ls_recover_idr);
680         kfree(ls->ls_recover_buf);
681  out_lkbidr:
682         idr_destroy(&ls->ls_lkbidr);
683         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
684                 if (ls->ls_remove_names[i])
685                         kfree(ls->ls_remove_names[i]);
686         }
687  out_rsbtbl:
688         vfree(ls->ls_rsbtbl);
689  out_lsfree:
690         if (do_unreg)
691                 kobject_put(&ls->ls_kobj);
692         else
693                 kfree(ls);
694  out:
695         module_put(THIS_MODULE);
696         return error;
697 }
698
699 int dlm_new_lockspace(const char *name, const char *cluster,
700                       uint32_t flags, int lvblen,
701                       const struct dlm_lockspace_ops *ops, void *ops_arg,
702                       int *ops_result, dlm_lockspace_t **lockspace)
703 {
704         int error = 0;
705
706         mutex_lock(&ls_lock);
707         if (!ls_count)
708                 error = threads_start();
709         if (error)
710                 goto out;
711
712         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
713                               ops_result, lockspace);
714         if (!error)
715                 ls_count++;
716         if (error > 0)
717                 error = 0;
718         if (!ls_count)
719                 threads_stop();
720  out:
721         mutex_unlock(&ls_lock);
722         return error;
723 }
724
725 static int lkb_idr_is_local(int id, void *p, void *data)
726 {
727         struct dlm_lkb *lkb = p;
728
729         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
730 }
731
732 static int lkb_idr_is_any(int id, void *p, void *data)
733 {
734         return 1;
735 }
736
737 static int lkb_idr_free(int id, void *p, void *data)
738 {
739         struct dlm_lkb *lkb = p;
740
741         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
742                 dlm_free_lvb(lkb->lkb_lvbptr);
743
744         dlm_free_lkb(lkb);
745         return 0;
746 }
747
748 /* NOTE: We check the lkbidr here rather than the resource table.
749    This is because there may be LKBs queued as ASTs that have been unlinked
750    from their RSBs and are pending deletion once the AST has been delivered */
751
752 static int lockspace_busy(struct dlm_ls *ls, int force)
753 {
754         int rv;
755
756         spin_lock(&ls->ls_lkbidr_spin);
757         if (force == 0) {
758                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
759         } else if (force == 1) {
760                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
761         } else {
762                 rv = 0;
763         }
764         spin_unlock(&ls->ls_lkbidr_spin);
765         return rv;
766 }
767
768 static int release_lockspace(struct dlm_ls *ls, int force)
769 {
770         struct dlm_rsb *rsb;
771         struct rb_node *n;
772         int i, busy, rv;
773
774         busy = lockspace_busy(ls, force);
775
776         spin_lock(&lslist_lock);
777         if (ls->ls_create_count == 1) {
778                 if (busy) {
779                         rv = -EBUSY;
780                 } else {
781                         /* remove_lockspace takes ls off lslist */
782                         ls->ls_create_count = 0;
783                         rv = 0;
784                 }
785         } else if (ls->ls_create_count > 1) {
786                 rv = --ls->ls_create_count;
787         } else {
788                 rv = -EINVAL;
789         }
790         spin_unlock(&lslist_lock);
791
792         if (rv) {
793                 log_debug(ls, "release_lockspace no remove %d", rv);
794                 return rv;
795         }
796
797         dlm_device_deregister(ls);
798
799         if (force < 3 && dlm_user_daemon_available())
800                 do_uevent(ls, 0);
801
802         dlm_recoverd_stop(ls);
803
804         dlm_callback_stop(ls);
805
806         remove_lockspace(ls);
807
808         dlm_delete_debug_file(ls);
809
810         kfree(ls->ls_recover_buf);
811
812         /*
813          * Free all lkb's in idr
814          */
815
816         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
817         idr_destroy(&ls->ls_lkbidr);
818
819         /*
820          * Free all rsb's on rsbtbl[] lists
821          */
822
823         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
824                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
825                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
826                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
827                         dlm_free_rsb(rsb);
828                 }
829
830                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
831                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
832                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
833                         dlm_free_rsb(rsb);
834                 }
835         }
836
837         vfree(ls->ls_rsbtbl);
838
839         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
840                 kfree(ls->ls_remove_names[i]);
841
842         while (!list_empty(&ls->ls_new_rsb)) {
843                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
844                                        res_hashchain);
845                 list_del(&rsb->res_hashchain);
846                 dlm_free_rsb(rsb);
847         }
848
849         /*
850          * Free structures on any other lists
851          */
852
853         dlm_purge_requestqueue(ls);
854         kfree(ls->ls_recover_args);
855         dlm_clear_members(ls);
856         dlm_clear_members_gone(ls);
857         kfree(ls->ls_node_array);
858         log_rinfo(ls, "release_lockspace final free");
859         kobject_put(&ls->ls_kobj);
860         /* The ls structure will be freed when the kobject is done with */
861
862         module_put(THIS_MODULE);
863         return 0;
864 }
865
866 /*
867  * Called when a system has released all its locks and is not going to use the
868  * lockspace any longer.  We free everything we're managing for this lockspace.
869  * Remaining nodes will go through the recovery process as if we'd died.  The
870  * lockspace must continue to function as usual, participating in recoveries,
871  * until this returns.
872  *
873  * Force has 4 possible values:
874  * 0 - don't destroy locksapce if it has any LKBs
875  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
876  * 2 - destroy lockspace regardless of LKBs
877  * 3 - destroy lockspace as part of a forced shutdown
878  */
879
880 int dlm_release_lockspace(void *lockspace, int force)
881 {
882         struct dlm_ls *ls;
883         int error;
884
885         ls = dlm_find_lockspace_local(lockspace);
886         if (!ls)
887                 return -EINVAL;
888         dlm_put_lockspace(ls);
889
890         mutex_lock(&ls_lock);
891         error = release_lockspace(ls, force);
892         if (!error)
893                 ls_count--;
894         if (!ls_count)
895                 threads_stop();
896         mutex_unlock(&ls_lock);
897
898         return error;
899 }
900
901 void dlm_stop_lockspaces(void)
902 {
903         struct dlm_ls *ls;
904         int count;
905
906  restart:
907         count = 0;
908         spin_lock(&lslist_lock);
909         list_for_each_entry(ls, &lslist, ls_list) {
910                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
911                         count++;
912                         continue;
913                 }
914                 spin_unlock(&lslist_lock);
915                 log_error(ls, "no userland control daemon, stopping lockspace");
916                 dlm_ls_stop(ls);
917                 goto restart;
918         }
919         spin_unlock(&lslist_lock);
920
921         if (count)
922                 log_print("dlm user daemon left %d lockspaces", count);
923 }
924