Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/ieee1394...
[sfrench/cifs-2.6.git] / fs / dlm / member.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2009 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "lowcomms.h"
21
22 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
23 {
24         struct dlm_member *memb = NULL;
25         struct list_head *tmp;
26         struct list_head *newlist = &new->list;
27         struct list_head *head = &ls->ls_nodes;
28
29         list_for_each(tmp, head) {
30                 memb = list_entry(tmp, struct dlm_member, list);
31                 if (new->nodeid < memb->nodeid)
32                         break;
33         }
34
35         if (!memb)
36                 list_add_tail(newlist, head);
37         else {
38                 /* FIXME: can use list macro here */
39                 newlist->prev = tmp->prev;
40                 newlist->next = tmp;
41                 tmp->prev->next = newlist;
42                 tmp->prev = newlist;
43         }
44 }
45
46 static int dlm_add_member(struct dlm_ls *ls, int nodeid)
47 {
48         struct dlm_member *memb;
49         int w, error;
50
51         memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
52         if (!memb)
53                 return -ENOMEM;
54
55         w = dlm_node_weight(ls->ls_name, nodeid);
56         if (w < 0) {
57                 kfree(memb);
58                 return w;
59         }
60
61         error = dlm_lowcomms_connect_node(nodeid);
62         if (error < 0) {
63                 kfree(memb);
64                 return error;
65         }
66
67         memb->nodeid = nodeid;
68         memb->weight = w;
69         add_ordered_member(ls, memb);
70         ls->ls_num_nodes++;
71         return 0;
72 }
73
74 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
75 {
76         list_move(&memb->list, &ls->ls_nodes_gone);
77         ls->ls_num_nodes--;
78 }
79
80 int dlm_is_member(struct dlm_ls *ls, int nodeid)
81 {
82         struct dlm_member *memb;
83
84         list_for_each_entry(memb, &ls->ls_nodes, list) {
85                 if (memb->nodeid == nodeid)
86                         return 1;
87         }
88         return 0;
89 }
90
91 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
92 {
93         struct dlm_member *memb;
94
95         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
96                 if (memb->nodeid == nodeid)
97                         return 1;
98         }
99         return 0;
100 }
101
102 static void clear_memb_list(struct list_head *head)
103 {
104         struct dlm_member *memb;
105
106         while (!list_empty(head)) {
107                 memb = list_entry(head->next, struct dlm_member, list);
108                 list_del(&memb->list);
109                 kfree(memb);
110         }
111 }
112
113 void dlm_clear_members(struct dlm_ls *ls)
114 {
115         clear_memb_list(&ls->ls_nodes);
116         ls->ls_num_nodes = 0;
117 }
118
119 void dlm_clear_members_gone(struct dlm_ls *ls)
120 {
121         clear_memb_list(&ls->ls_nodes_gone);
122 }
123
124 static void make_member_array(struct dlm_ls *ls)
125 {
126         struct dlm_member *memb;
127         int i, w, x = 0, total = 0, all_zero = 0, *array;
128
129         kfree(ls->ls_node_array);
130         ls->ls_node_array = NULL;
131
132         list_for_each_entry(memb, &ls->ls_nodes, list) {
133                 if (memb->weight)
134                         total += memb->weight;
135         }
136
137         /* all nodes revert to weight of 1 if all have weight 0 */
138
139         if (!total) {
140                 total = ls->ls_num_nodes;
141                 all_zero = 1;
142         }
143
144         ls->ls_total_weight = total;
145
146         array = kmalloc(sizeof(int) * total, GFP_NOFS);
147         if (!array)
148                 return;
149
150         list_for_each_entry(memb, &ls->ls_nodes, list) {
151                 if (!all_zero && !memb->weight)
152                         continue;
153
154                 if (all_zero)
155                         w = 1;
156                 else
157                         w = memb->weight;
158
159                 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
160
161                 for (i = 0; i < w; i++)
162                         array[x++] = memb->nodeid;
163         }
164
165         ls->ls_node_array = array;
166 }
167
168 /* send a status request to all members just to establish comms connections */
169
170 static int ping_members(struct dlm_ls *ls)
171 {
172         struct dlm_member *memb;
173         int error = 0;
174
175         list_for_each_entry(memb, &ls->ls_nodes, list) {
176                 error = dlm_recovery_stopped(ls);
177                 if (error)
178                         break;
179                 error = dlm_rcom_status(ls, memb->nodeid);
180                 if (error)
181                         break;
182         }
183         if (error)
184                 log_debug(ls, "ping_members aborted %d last nodeid %d",
185                           error, ls->ls_recover_nodeid);
186         return error;
187 }
188
189 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
190 {
191         struct dlm_member *memb, *safe;
192         int i, error, found, pos = 0, neg = 0, low = -1;
193
194         /* previously removed members that we've not finished removing need to
195            count as a negative change so the "neg" recovery steps will happen */
196
197         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
198                 log_debug(ls, "prev removed member %d", memb->nodeid);
199                 neg++;
200         }
201
202         /* move departed members from ls_nodes to ls_nodes_gone */
203
204         list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
205                 found = 0;
206                 for (i = 0; i < rv->node_count; i++) {
207                         if (memb->nodeid == rv->nodeids[i]) {
208                                 found = 1;
209                                 break;
210                         }
211                 }
212
213                 if (!found) {
214                         neg++;
215                         dlm_remove_member(ls, memb);
216                         log_debug(ls, "remove member %d", memb->nodeid);
217                 }
218         }
219
220         /* Add an entry to ls_nodes_gone for members that were removed and
221            then added again, so that previous state for these nodes will be
222            cleared during recovery. */
223
224         for (i = 0; i < rv->new_count; i++) {
225                 if (!dlm_is_member(ls, rv->new[i]))
226                         continue;
227                 log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
228
229                 memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
230                 if (!memb)
231                         return -ENOMEM;
232                 memb->nodeid = rv->new[i];
233                 list_add_tail(&memb->list, &ls->ls_nodes_gone);
234                 neg++;
235         }
236
237         /* add new members to ls_nodes */
238
239         for (i = 0; i < rv->node_count; i++) {
240                 if (dlm_is_member(ls, rv->nodeids[i]))
241                         continue;
242                 dlm_add_member(ls, rv->nodeids[i]);
243                 pos++;
244                 log_debug(ls, "add member %d", rv->nodeids[i]);
245         }
246
247         list_for_each_entry(memb, &ls->ls_nodes, list) {
248                 if (low == -1 || memb->nodeid < low)
249                         low = memb->nodeid;
250         }
251         ls->ls_low_nodeid = low;
252
253         make_member_array(ls);
254         dlm_set_recover_status(ls, DLM_RS_NODES);
255         *neg_out = neg;
256
257         error = ping_members(ls);
258         if (!error || error == -EPROTO) {
259                 /* new_lockspace() may be waiting to know if the config
260                    is good or bad */
261                 ls->ls_members_result = error;
262                 complete(&ls->ls_members_done);
263         }
264         if (error)
265                 goto out;
266
267         error = dlm_recover_members_wait(ls);
268  out:
269         log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
270         return error;
271 }
272
273 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
274    dlm_ls_start() is called on any of them to start the new recovery. */
275
276 int dlm_ls_stop(struct dlm_ls *ls)
277 {
278         int new;
279
280         /*
281          * Prevent dlm_recv from being in the middle of something when we do
282          * the stop.  This includes ensuring dlm_recv isn't processing a
283          * recovery message (rcom), while dlm_recoverd is aborting and
284          * resetting things from an in-progress recovery.  i.e. we want
285          * dlm_recoverd to abort its recovery without worrying about dlm_recv
286          * processing an rcom at the same time.  Stopping dlm_recv also makes
287          * it easy for dlm_receive_message() to check locking stopped and add a
288          * message to the requestqueue without races.
289          */
290
291         down_write(&ls->ls_recv_active);
292
293         /*
294          * Abort any recovery that's in progress (see RECOVERY_STOP,
295          * dlm_recovery_stopped()) and tell any other threads running in the
296          * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
297          */
298
299         spin_lock(&ls->ls_recover_lock);
300         set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
301         new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
302         ls->ls_recover_seq++;
303         spin_unlock(&ls->ls_recover_lock);
304
305         /*
306          * Let dlm_recv run again, now any normal messages will be saved on the
307          * requestqueue for later.
308          */
309
310         up_write(&ls->ls_recv_active);
311
312         /*
313          * This in_recovery lock does two things:
314          * 1) Keeps this function from returning until all threads are out
315          *    of locking routines and locking is truly stopped.
316          * 2) Keeps any new requests from being processed until it's unlocked
317          *    when recovery is complete.
318          */
319
320         if (new)
321                 down_write(&ls->ls_in_recovery);
322
323         /*
324          * The recoverd suspend/resume makes sure that dlm_recoverd (if
325          * running) has noticed RECOVERY_STOP above and quit processing the
326          * previous recovery.
327          */
328
329         dlm_recoverd_suspend(ls);
330         ls->ls_recover_status = 0;
331         dlm_recoverd_resume(ls);
332
333         if (!ls->ls_recover_begin)
334                 ls->ls_recover_begin = jiffies;
335         return 0;
336 }
337
338 int dlm_ls_start(struct dlm_ls *ls)
339 {
340         struct dlm_recover *rv = NULL, *rv_old;
341         int *ids = NULL, *new = NULL;
342         int error, ids_count = 0, new_count = 0;
343
344         rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
345         if (!rv)
346                 return -ENOMEM;
347
348         error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
349                                 &new, &new_count);
350         if (error < 0)
351                 goto fail;
352
353         spin_lock(&ls->ls_recover_lock);
354
355         /* the lockspace needs to be stopped before it can be started */
356
357         if (!dlm_locking_stopped(ls)) {
358                 spin_unlock(&ls->ls_recover_lock);
359                 log_error(ls, "start ignored: lockspace running");
360                 error = -EINVAL;
361                 goto fail;
362         }
363
364         rv->nodeids = ids;
365         rv->node_count = ids_count;
366         rv->new = new;
367         rv->new_count = new_count;
368         rv->seq = ++ls->ls_recover_seq;
369         rv_old = ls->ls_recover_args;
370         ls->ls_recover_args = rv;
371         spin_unlock(&ls->ls_recover_lock);
372
373         if (rv_old) {
374                 log_error(ls, "unused recovery %llx %d",
375                           (unsigned long long)rv_old->seq, rv_old->node_count);
376                 kfree(rv_old->nodeids);
377                 kfree(rv_old->new);
378                 kfree(rv_old);
379         }
380
381         dlm_recoverd_kick(ls);
382         return 0;
383
384  fail:
385         kfree(rv);
386         kfree(ids);
387         kfree(new);
388         return error;
389 }
390