]> bbs.cooldavid.org Git - net-next-2.6.git/blob - fs/dlm/member.c
[DLM] The core of the DLM for GFS2/CLVM
[net-next-2.6.git] / fs / dlm / member.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "lowcomms.h"
19 #include "rcom.h"
20 #include "config.h"
21
22 /*
23  * Following called by dlm_recoverd thread
24  */
25
26 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
27 {
28         struct dlm_member *memb = NULL;
29         struct list_head *tmp;
30         struct list_head *newlist = &new->list;
31         struct list_head *head = &ls->ls_nodes;
32
33         list_for_each(tmp, head) {
34                 memb = list_entry(tmp, struct dlm_member, list);
35                 if (new->nodeid < memb->nodeid)
36                         break;
37         }
38
39         if (!memb)
40                 list_add_tail(newlist, head);
41         else {
42                 /* FIXME: can use list macro here */
43                 newlist->prev = tmp->prev;
44                 newlist->next = tmp;
45                 tmp->prev->next = newlist;
46                 tmp->prev = newlist;
47         }
48 }
49
50 static int dlm_add_member(struct dlm_ls *ls, int nodeid)
51 {
52         struct dlm_member *memb;
53         int w;
54
55         memb = kmalloc(sizeof(struct dlm_member), GFP_KERNEL);
56         if (!memb)
57                 return -ENOMEM;
58
59         w = dlm_node_weight(ls->ls_name, nodeid);
60         if (w < 0)
61                 return w;
62
63         memb->nodeid = nodeid;
64         memb->weight = w;
65         add_ordered_member(ls, memb);
66         ls->ls_num_nodes++;
67         return 0;
68 }
69
70 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
71 {
72         list_move(&memb->list, &ls->ls_nodes_gone);
73         ls->ls_num_nodes--;
74 }
75
76 static int dlm_is_member(struct dlm_ls *ls, int nodeid)
77 {
78         struct dlm_member *memb;
79
80         list_for_each_entry(memb, &ls->ls_nodes, list) {
81                 if (memb->nodeid == nodeid)
82                         return TRUE;
83         }
84         return FALSE;
85 }
86
87 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
88 {
89         struct dlm_member *memb;
90
91         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
92                 if (memb->nodeid == nodeid)
93                         return TRUE;
94         }
95         return FALSE;
96 }
97
98 static void clear_memb_list(struct list_head *head)
99 {
100         struct dlm_member *memb;
101
102         while (!list_empty(head)) {
103                 memb = list_entry(head->next, struct dlm_member, list);
104                 list_del(&memb->list);
105                 kfree(memb);
106         }
107 }
108
109 void dlm_clear_members(struct dlm_ls *ls)
110 {
111         clear_memb_list(&ls->ls_nodes);
112         ls->ls_num_nodes = 0;
113 }
114
115 void dlm_clear_members_gone(struct dlm_ls *ls)
116 {
117         clear_memb_list(&ls->ls_nodes_gone);
118 }
119
120 static void make_member_array(struct dlm_ls *ls)
121 {
122         struct dlm_member *memb;
123         int i, w, x = 0, total = 0, all_zero = 0, *array;
124
125         kfree(ls->ls_node_array);
126         ls->ls_node_array = NULL;
127
128         list_for_each_entry(memb, &ls->ls_nodes, list) {
129                 if (memb->weight)
130                         total += memb->weight;
131         }
132
133         /* all nodes revert to weight of 1 if all have weight 0 */
134
135         if (!total) {
136                 total = ls->ls_num_nodes;
137                 all_zero = 1;
138         }
139
140         ls->ls_total_weight = total;
141
142         array = kmalloc(sizeof(int) * total, GFP_KERNEL);
143         if (!array)
144                 return;
145
146         list_for_each_entry(memb, &ls->ls_nodes, list) {
147                 if (!all_zero && !memb->weight)
148                         continue;
149
150                 if (all_zero)
151                         w = 1;
152                 else
153                         w = memb->weight;
154
155                 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
156
157                 for (i = 0; i < w; i++)
158                         array[x++] = memb->nodeid;
159         }
160
161         ls->ls_node_array = array;
162 }
163
164 /* send a status request to all members just to establish comms connections */
165
166 static void ping_members(struct dlm_ls *ls)
167 {
168         struct dlm_member *memb;
169         list_for_each_entry(memb, &ls->ls_nodes, list)
170                 dlm_rcom_status(ls, memb->nodeid);
171 }
172
173 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
174 {
175         struct dlm_member *memb, *safe;
176         int i, error, found, pos = 0, neg = 0, low = -1;
177
178         /* move departed members from ls_nodes to ls_nodes_gone */
179
180         list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
181                 found = FALSE;
182                 for (i = 0; i < rv->node_count; i++) {
183                         if (memb->nodeid == rv->nodeids[i]) {
184                                 found = TRUE;
185                                 break;
186                         }
187                 }
188
189                 if (!found) {
190                         neg++;
191                         dlm_remove_member(ls, memb);
192                         log_debug(ls, "remove member %d", memb->nodeid);
193                 }
194         }
195
196         /* add new members to ls_nodes */
197
198         for (i = 0; i < rv->node_count; i++) {
199                 if (dlm_is_member(ls, rv->nodeids[i]))
200                         continue;
201                 dlm_add_member(ls, rv->nodeids[i]);
202                 pos++;
203                 log_debug(ls, "add member %d", rv->nodeids[i]);
204         }
205
206         list_for_each_entry(memb, &ls->ls_nodes, list) {
207                 if (low == -1 || memb->nodeid < low)
208                         low = memb->nodeid;
209         }
210         ls->ls_low_nodeid = low;
211
212         make_member_array(ls);
213         dlm_set_recover_status(ls, DLM_RS_NODES);
214         *neg_out = neg;
215
216         ping_members(ls);
217
218         error = dlm_recover_members_wait(ls);
219         log_debug(ls, "total members %d", ls->ls_num_nodes);
220         return error;
221 }
222
223 /*
224  * Following called from lockspace.c
225  */
226
227 int dlm_ls_stop(struct dlm_ls *ls)
228 {
229         int new;
230
231         /*
232          * A stop cancels any recovery that's in progress (see RECOVERY_STOP,
233          * dlm_recovery_stopped()) and prevents any new locks from being
234          * processed (see RUNNING, dlm_locking_stopped()).
235          */
236
237         spin_lock(&ls->ls_recover_lock);
238         set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
239         new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
240         ls->ls_recover_seq++;
241         spin_unlock(&ls->ls_recover_lock);
242
243         /*
244          * This in_recovery lock does two things:
245          *
246          * 1) Keeps this function from returning until all threads are out
247          *    of locking routines and locking is truely stopped.
248          * 2) Keeps any new requests from being processed until it's unlocked
249          *    when recovery is complete.
250          */
251
252         if (new)
253                 down_write(&ls->ls_in_recovery);
254
255         /*
256          * The recoverd suspend/resume makes sure that dlm_recoverd (if
257          * running) has noticed the clearing of RUNNING above and quit
258          * processing the previous recovery.  This will be true for all nodes
259          * before any nodes start the new recovery.
260          */
261
262         dlm_recoverd_suspend(ls);
263         ls->ls_recover_status = 0;
264         dlm_recoverd_resume(ls);
265         return 0;
266 }
267
268 int dlm_ls_start(struct dlm_ls *ls)
269 {
270         struct dlm_recover *rv = NULL, *rv_old;
271         int *ids = NULL;
272         int error, count;
273
274         rv = kmalloc(sizeof(struct dlm_recover), GFP_KERNEL);
275         if (!rv)
276                 return -ENOMEM;
277         memset(rv, 0, sizeof(struct dlm_recover));
278
279         error = count = dlm_nodeid_list(ls->ls_name, &ids);
280         if (error <= 0)
281                 goto fail;
282
283         spin_lock(&ls->ls_recover_lock);
284
285         /* the lockspace needs to be stopped before it can be started */
286
287         if (!dlm_locking_stopped(ls)) {
288                 spin_unlock(&ls->ls_recover_lock);
289                 log_error(ls, "start ignored: lockspace running");
290                 error = -EINVAL;
291                 goto fail;
292         }
293
294         rv->nodeids = ids;
295         rv->node_count = count;
296         rv->seq = ++ls->ls_recover_seq;
297         rv_old = ls->ls_recover_args;
298         ls->ls_recover_args = rv;
299         spin_unlock(&ls->ls_recover_lock);
300
301         if (rv_old) {
302                 kfree(rv_old->nodeids);
303                 kfree(rv_old);
304         }
305
306         dlm_recoverd_kick(ls);
307         return 0;
308
309  fail:
310         kfree(rv);
311         kfree(ids);
312         return error;
313 }
314