]> bbs.cooldavid.org Git - net-next-2.6.git/blob - fs/dlm/lock.c
xps: Transmit Packet Steering
[net-next-2.6.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88                                     struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167 }
168
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         lkb->lkb_lksb->sb_status = rv;
309         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
310
311         dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         lkb->lkb_time_bast = ktime_get();
323
324         if (is_master_copy(lkb)) {
325                 lkb->lkb_bastmode = rqmode; /* printed by debugfs */
326                 send_bast(r, lkb, rqmode);
327         } else {
328                 dlm_add_ast(lkb, AST_BAST, rqmode);
329         }
330 }
331
332 /*
333  * Basic operations on rsb's and lkb's
334  */
335
336 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
337 {
338         struct dlm_rsb *r;
339
340         r = dlm_allocate_rsb(ls, len);
341         if (!r)
342                 return NULL;
343
344         r->res_ls = ls;
345         r->res_length = len;
346         memcpy(r->res_name, name, len);
347         mutex_init(&r->res_mutex);
348
349         INIT_LIST_HEAD(&r->res_lookup);
350         INIT_LIST_HEAD(&r->res_grantqueue);
351         INIT_LIST_HEAD(&r->res_convertqueue);
352         INIT_LIST_HEAD(&r->res_waitqueue);
353         INIT_LIST_HEAD(&r->res_root_list);
354         INIT_LIST_HEAD(&r->res_recover_list);
355
356         return r;
357 }
358
359 static int search_rsb_list(struct list_head *head, char *name, int len,
360                            unsigned int flags, struct dlm_rsb **r_ret)
361 {
362         struct dlm_rsb *r;
363         int error = 0;
364
365         list_for_each_entry(r, head, res_hashchain) {
366                 if (len == r->res_length && !memcmp(name, r->res_name, len))
367                         goto found;
368         }
369         *r_ret = NULL;
370         return -EBADR;
371
372  found:
373         if (r->res_nodeid && (flags & R_MASTER))
374                 error = -ENOTBLK;
375         *r_ret = r;
376         return error;
377 }
378
379 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
380                        unsigned int flags, struct dlm_rsb **r_ret)
381 {
382         struct dlm_rsb *r;
383         int error;
384
385         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
386         if (!error) {
387                 kref_get(&r->res_ref);
388                 goto out;
389         }
390         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
391         if (error)
392                 goto out;
393
394         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
395
396         if (dlm_no_directory(ls))
397                 goto out;
398
399         if (r->res_nodeid == -1) {
400                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else if (r->res_nodeid > 0) {
403                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
404                 r->res_first_lkid = 0;
405         } else {
406                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
407                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
408         }
409  out:
410         *r_ret = r;
411         return error;
412 }
413
414 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
415                       unsigned int flags, struct dlm_rsb **r_ret)
416 {
417         int error;
418         spin_lock(&ls->ls_rsbtbl[b].lock);
419         error = _search_rsb(ls, name, len, b, flags, r_ret);
420         spin_unlock(&ls->ls_rsbtbl[b].lock);
421         return error;
422 }
423
424 /*
425  * Find rsb in rsbtbl and potentially create/add one
426  *
427  * Delaying the release of rsb's has a similar benefit to applications keeping
428  * NL locks on an rsb, but without the guarantee that the cached master value
429  * will still be valid when the rsb is reused.  Apps aren't always smart enough
430  * to keep NL locks on an rsb that they may lock again shortly; this can lead
431  * to excessive master lookups and removals if we don't delay the release.
432  *
433  * Searching for an rsb means looking through both the normal list and toss
434  * list.  When found on the toss list the rsb is moved to the normal list with
435  * ref count of 1; when found on normal list the ref count is incremented.
436  */
437
438 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
439                     unsigned int flags, struct dlm_rsb **r_ret)
440 {
441         struct dlm_rsb *r = NULL, *tmp;
442         uint32_t hash, bucket;
443         int error = -EINVAL;
444
445         if (namelen > DLM_RESNAME_MAXLEN)
446                 goto out;
447
448         if (dlm_no_directory(ls))
449                 flags |= R_CREATE;
450
451         error = 0;
452         hash = jhash(name, namelen, 0);
453         bucket = hash & (ls->ls_rsbtbl_size - 1);
454
455         error = search_rsb(ls, name, namelen, bucket, flags, &r);
456         if (!error)
457                 goto out;
458
459         if (error == -EBADR && !(flags & R_CREATE))
460                 goto out;
461
462         /* the rsb was found but wasn't a master copy */
463         if (error == -ENOTBLK)
464                 goto out;
465
466         error = -ENOMEM;
467         r = create_rsb(ls, name, namelen);
468         if (!r)
469                 goto out;
470
471         r->res_hash = hash;
472         r->res_bucket = bucket;
473         r->res_nodeid = -1;
474         kref_init(&r->res_ref);
475
476         /* With no directory, the master can be set immediately */
477         if (dlm_no_directory(ls)) {
478                 int nodeid = dlm_dir_nodeid(r);
479                 if (nodeid == dlm_our_nodeid())
480                         nodeid = 0;
481                 r->res_nodeid = nodeid;
482         }
483
484         spin_lock(&ls->ls_rsbtbl[bucket].lock);
485         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
486         if (!error) {
487                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488                 dlm_free_rsb(r);
489                 r = tmp;
490                 goto out;
491         }
492         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
493         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
494         error = 0;
495  out:
496         *r_ret = r;
497         return error;
498 }
499
500 /* This is only called to add a reference when the code already holds
501    a valid reference to the rsb, so there's no need for locking. */
502
503 static inline void hold_rsb(struct dlm_rsb *r)
504 {
505         kref_get(&r->res_ref);
506 }
507
508 void dlm_hold_rsb(struct dlm_rsb *r)
509 {
510         hold_rsb(r);
511 }
512
513 static void toss_rsb(struct kref *kref)
514 {
515         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
516         struct dlm_ls *ls = r->res_ls;
517
518         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
519         kref_init(&r->res_ref);
520         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
521         r->res_toss_time = jiffies;
522         if (r->res_lvbptr) {
523                 dlm_free_lvb(r->res_lvbptr);
524                 r->res_lvbptr = NULL;
525         }
526 }
527
528 /* When all references to the rsb are gone it's transfered to
529    the tossed list for later disposal. */
530
531 static void put_rsb(struct dlm_rsb *r)
532 {
533         struct dlm_ls *ls = r->res_ls;
534         uint32_t bucket = r->res_bucket;
535
536         spin_lock(&ls->ls_rsbtbl[bucket].lock);
537         kref_put(&r->res_ref, toss_rsb);
538         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
539 }
540
541 void dlm_put_rsb(struct dlm_rsb *r)
542 {
543         put_rsb(r);
544 }
545
546 /* See comment for unhold_lkb */
547
548 static void unhold_rsb(struct dlm_rsb *r)
549 {
550         int rv;
551         rv = kref_put(&r->res_ref, toss_rsb);
552         DLM_ASSERT(!rv, dlm_dump_rsb(r););
553 }
554
555 static void kill_rsb(struct kref *kref)
556 {
557         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
558
559         /* All work is done after the return from kref_put() so we
560            can release the write_lock before the remove and free. */
561
562         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
565         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
566         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
567         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
568 }
569
570 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
571    The rsb must exist as long as any lkb's for it do. */
572
573 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
574 {
575         hold_rsb(r);
576         lkb->lkb_resource = r;
577 }
578
579 static void detach_lkb(struct dlm_lkb *lkb)
580 {
581         if (lkb->lkb_resource) {
582                 put_rsb(lkb->lkb_resource);
583                 lkb->lkb_resource = NULL;
584         }
585 }
586
587 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
588 {
589         struct dlm_lkb *lkb, *tmp;
590         uint32_t lkid = 0;
591         uint16_t bucket;
592
593         lkb = dlm_allocate_lkb(ls);
594         if (!lkb)
595                 return -ENOMEM;
596
597         lkb->lkb_nodeid = -1;
598         lkb->lkb_grmode = DLM_LOCK_IV;
599         kref_init(&lkb->lkb_ref);
600         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
601         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
602         INIT_LIST_HEAD(&lkb->lkb_time_list);
603
604         get_random_bytes(&bucket, sizeof(bucket));
605         bucket &= (ls->ls_lkbtbl_size - 1);
606
607         write_lock(&ls->ls_lkbtbl[bucket].lock);
608
609         /* counter can roll over so we must verify lkid is not in use */
610
611         while (lkid == 0) {
612                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
613
614                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
615                                     lkb_idtbl_list) {
616                         if (tmp->lkb_id != lkid)
617                                 continue;
618                         lkid = 0;
619                         break;
620                 }
621         }
622
623         lkb->lkb_id = lkid;
624         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
625         write_unlock(&ls->ls_lkbtbl[bucket].lock);
626
627         *lkb_ret = lkb;
628         return 0;
629 }
630
631 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
632 {
633         struct dlm_lkb *lkb;
634         uint16_t bucket = (lkid >> 16);
635
636         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
637                 if (lkb->lkb_id == lkid)
638                         return lkb;
639         }
640         return NULL;
641 }
642
643 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
644 {
645         struct dlm_lkb *lkb;
646         uint16_t bucket = (lkid >> 16);
647
648         if (bucket >= ls->ls_lkbtbl_size)
649                 return -EBADSLT;
650
651         read_lock(&ls->ls_lkbtbl[bucket].lock);
652         lkb = __find_lkb(ls, lkid);
653         if (lkb)
654                 kref_get(&lkb->lkb_ref);
655         read_unlock(&ls->ls_lkbtbl[bucket].lock);
656
657         *lkb_ret = lkb;
658         return lkb ? 0 : -ENOENT;
659 }
660
661 static void kill_lkb(struct kref *kref)
662 {
663         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
664
665         /* All work is done after the return from kref_put() so we
666            can release the write_lock before the detach_lkb */
667
668         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
669 }
670
671 /* __put_lkb() is used when an lkb may not have an rsb attached to
672    it so we need to provide the lockspace explicitly */
673
674 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
675 {
676         uint16_t bucket = (lkb->lkb_id >> 16);
677
678         write_lock(&ls->ls_lkbtbl[bucket].lock);
679         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
680                 list_del(&lkb->lkb_idtbl_list);
681                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
682
683                 detach_lkb(lkb);
684
685                 /* for local/process lkbs, lvbptr points to caller's lksb */
686                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
687                         dlm_free_lvb(lkb->lkb_lvbptr);
688                 dlm_free_lkb(lkb);
689                 return 1;
690         } else {
691                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
692                 return 0;
693         }
694 }
695
696 int dlm_put_lkb(struct dlm_lkb *lkb)
697 {
698         struct dlm_ls *ls;
699
700         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
701         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
702
703         ls = lkb->lkb_resource->res_ls;
704         return __put_lkb(ls, lkb);
705 }
706
707 /* This is only called to add a reference when the code already holds
708    a valid reference to the lkb, so there's no need for locking. */
709
710 static inline void hold_lkb(struct dlm_lkb *lkb)
711 {
712         kref_get(&lkb->lkb_ref);
713 }
714
715 /* This is called when we need to remove a reference and are certain
716    it's not the last ref.  e.g. del_lkb is always called between a
717    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
718    put_lkb would work fine, but would involve unnecessary locking */
719
720 static inline void unhold_lkb(struct dlm_lkb *lkb)
721 {
722         int rv;
723         rv = kref_put(&lkb->lkb_ref, kill_lkb);
724         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
725 }
726
727 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
728                             int mode)
729 {
730         struct dlm_lkb *lkb = NULL;
731
732         list_for_each_entry(lkb, head, lkb_statequeue)
733                 if (lkb->lkb_rqmode < mode)
734                         break;
735
736         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_timestamp = ktime_get();
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867                                 struct dlm_message *ms)
868 {
869         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870         int overlap_done = 0;
871
872         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878
879         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882                 overlap_done = 1;
883                 goto out_del;
884         }
885
886         /* Cancel state was preemptively cleared by a successful convert,
887            see next comment, nothing to do. */
888
889         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
892                           lkb->lkb_id, lkb->lkb_wait_type);
893                 return -1;
894         }
895
896         /* Remove for the convert reply, and premptively remove for the
897            cancel reply.  A convert has been granted while there's still
898            an outstanding cancel on it (the cancel is moot and the result
899            in the cancel reply should be 0).  We preempt the cancel reply
900            because the app gets the convert result and then can follow up
901            with another op, like convert.  This subsequent op would see the
902            lingering state of the cancel and fail with -EBUSY. */
903
904         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906             is_overlap_cancel(lkb) && ms && !ms->m_result) {
907                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908                           lkb->lkb_id);
909                 lkb->lkb_wait_type = 0;
910                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911                 lkb->lkb_wait_count--;
912                 goto out_del;
913         }
914
915         /* N.B. type of reply may not always correspond to type of original
916            msg due to lookup->request optimization, verify others? */
917
918         if (lkb->lkb_wait_type) {
919                 lkb->lkb_wait_type = 0;
920                 goto out_del;
921         }
922
923         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924                   lkb->lkb_id, mstype, lkb->lkb_flags);
925         return -1;
926
927  out_del:
928         /* the force-unlock/cancel has completed and we haven't recvd a reply
929            to the op that was in progress prior to the unlock/cancel; we
930            give up on any reply to the earlier op.  FIXME: not sure when/how
931            this would happen */
932
933         if (overlap_done && lkb->lkb_wait_type) {
934                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
936                 lkb->lkb_wait_count--;
937                 lkb->lkb_wait_type = 0;
938         }
939
940         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941
942         lkb->lkb_flags &= ~DLM_IFL_RESEND;
943         lkb->lkb_wait_count--;
944         if (!lkb->lkb_wait_count)
945                 list_del_init(&lkb->lkb_wait_reply);
946         unhold_lkb(lkb);
947         return 0;
948 }
949
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953         int error;
954
955         mutex_lock(&ls->ls_waiters_mutex);
956         error = _remove_from_waiters(lkb, mstype, NULL);
957         mutex_unlock(&ls->ls_waiters_mutex);
958         return error;
959 }
960
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967         int error;
968
969         if (ms != &ls->ls_stub_ms)
970                 mutex_lock(&ls->ls_waiters_mutex);
971         error = _remove_from_waiters(lkb, ms->m_type, ms);
972         if (ms != &ls->ls_stub_ms)
973                 mutex_unlock(&ls->ls_waiters_mutex);
974         return error;
975 }
976
977 static void dir_remove(struct dlm_rsb *r)
978 {
979         int to_nodeid;
980
981         if (dlm_no_directory(r->res_ls))
982                 return;
983
984         to_nodeid = dlm_dir_nodeid(r);
985         if (to_nodeid != dlm_our_nodeid())
986                 send_remove(r);
987         else
988                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
989                                      r->res_name, r->res_length);
990 }
991
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997         struct dlm_rsb *r;
998         int count = 0, found;
999
1000         for (;;) {
1001                 found = 0;
1002                 spin_lock(&ls->ls_rsbtbl[b].lock);
1003                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                             res_hashchain) {
1005                         if (!time_after_eq(jiffies, r->res_toss_time +
1006                                            dlm_config.ci_toss_secs * HZ))
1007                                 continue;
1008                         found = 1;
1009                         break;
1010                 }
1011
1012                 if (!found) {
1013                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                         break;
1015                 }
1016
1017                 if (kref_put(&r->res_ref, kill_rsb)) {
1018                         list_del(&r->res_hashchain);
1019                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                         if (is_master(r))
1022                                 dir_remove(r);
1023                         dlm_free_rsb(r);
1024                         count++;
1025                 } else {
1026                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                         log_error(ls, "tossed rsb in use %s", r->res_name);
1028                 }
1029         }
1030
1031         return count;
1032 }
1033
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036         int i;
1037
1038         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                 shrink_bucket(ls, i);
1040                 if (dlm_locking_stopped(ls))
1041                         break;
1042                 cond_resched();
1043         }
1044 }
1045
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050         if (is_master_copy(lkb))
1051                 return;
1052
1053         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                 goto add_it;
1057         }
1058         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                 goto add_it;
1060         return;
1061
1062  add_it:
1063         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064         mutex_lock(&ls->ls_timeout_mutex);
1065         hold_lkb(lkb);
1066         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067         mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074         mutex_lock(&ls->ls_timeout_mutex);
1075         if (!list_empty(&lkb->lkb_time_list)) {
1076                 list_del_init(&lkb->lkb_time_list);
1077                 unhold_lkb(lkb);
1078         }
1079         mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090         struct dlm_rsb *r;
1091         struct dlm_lkb *lkb;
1092         int do_cancel, do_warn;
1093         s64 wait_us;
1094
1095         for (;;) {
1096                 if (dlm_locking_stopped(ls))
1097                         break;
1098
1099                 do_cancel = 0;
1100                 do_warn = 0;
1101                 mutex_lock(&ls->ls_timeout_mutex);
1102                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                         lkb->lkb_timestamp));
1106
1107                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                 do_cancel = 1;
1110
1111                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                 do_warn = 1;
1114
1115                         if (!do_cancel && !do_warn)
1116                                 continue;
1117                         hold_lkb(lkb);
1118                         break;
1119                 }
1120                 mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                 if (!do_cancel && !do_warn)
1123                         break;
1124
1125                 r = lkb->lkb_resource;
1126                 hold_rsb(r);
1127                 lock_rsb(r);
1128
1129                 if (do_warn) {
1130                         /* clear flag so we only warn once */
1131                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                 del_timeout(lkb);
1134                         dlm_timeout_warn(lkb);
1135                 }
1136
1137                 if (do_cancel) {
1138                         log_debug(ls, "timeout cancel %x node %d %s",
1139                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                         del_timeout(lkb);
1143                         _cancel_lock(r, lkb);
1144                 }
1145
1146                 unlock_rsb(r);
1147                 unhold_rsb(r);
1148                 dlm_put_lkb(lkb);
1149         }
1150 }
1151
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157         struct dlm_lkb *lkb;
1158         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160         ls->ls_recover_begin = 0;
1161         mutex_lock(&ls->ls_timeout_mutex);
1162         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164         mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166
1167 /* lkb is master or local copy */
1168
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171         int b, len = r->res_ls->ls_lvblen;
1172
1173         /* b=1 lvb returned to caller
1174            b=0 lvb written to rsb or invalidated
1175            b=-1 do nothing */
1176
1177         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179         if (b == 1) {
1180                 if (!lkb->lkb_lvbptr)
1181                         return;
1182
1183                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                         return;
1185
1186                 if (!r->res_lvbptr)
1187                         return;
1188
1189                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                 lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192         } else if (b == 0) {
1193                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                         rsb_set_flag(r, RSB_VALNOTVALID);
1195                         return;
1196                 }
1197
1198                 if (!lkb->lkb_lvbptr)
1199                         return;
1200
1201                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                         return;
1203
1204                 if (!r->res_lvbptr)
1205                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                 if (!r->res_lvbptr)
1208                         return;
1209
1210                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                 r->res_lvbseq++;
1212                 lkb->lkb_lvbseq = r->res_lvbseq;
1213                 rsb_clear_flag(r, RSB_VALNOTVALID);
1214         }
1215
1216         if (rsb_flag(r, RSB_VALNOTVALID))
1217                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222         if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                 return;
1224
1225         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                 rsb_set_flag(r, RSB_VALNOTVALID);
1227                 return;
1228         }
1229
1230         if (!lkb->lkb_lvbptr)
1231                 return;
1232
1233         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                 return;
1235
1236         if (!r->res_lvbptr)
1237                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239         if (!r->res_lvbptr)
1240                 return;
1241
1242         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243         r->res_lvbseq++;
1244         rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246
1247 /* lkb is process copy (pc) */
1248
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                             struct dlm_message *ms)
1251 {
1252         int b;
1253
1254         if (!lkb->lkb_lvbptr)
1255                 return;
1256
1257         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                 return;
1259
1260         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261         if (b == 1) {
1262                 int len = receive_extralen(ms);
1263                 if (len > DLM_RESNAME_MAXLEN)
1264                         len = DLM_RESNAME_MAXLEN;
1265                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                 lkb->lkb_lvbseq = ms->m_lvbseq;
1267         }
1268 }
1269
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282         del_lkb(r, lkb);
1283         lkb->lkb_grmode = DLM_LOCK_IV;
1284         /* this unhold undoes the original ref from create_lkb()
1285            so this leads to the lkb being freed */
1286         unhold_lkb(lkb);
1287 }
1288
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291         set_lvb_unlock(r, lkb);
1292         _remove_lock(r, lkb);
1293 }
1294
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297         _remove_lock(r, lkb);
1298 }
1299
1300 /* returns: 0 did nothing
1301             1 moved lock to granted
1302            -1 removed lock */
1303
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         int rv = 0;
1307
1308         lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310         switch (lkb->lkb_status) {
1311         case DLM_LKSTS_GRANTED:
1312                 break;
1313         case DLM_LKSTS_CONVERT:
1314                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                 rv = 1;
1316                 break;
1317         case DLM_LKSTS_WAITING:
1318                 del_lkb(r, lkb);
1319                 lkb->lkb_grmode = DLM_LOCK_IV;
1320                 /* this unhold undoes the original ref from create_lkb()
1321                    so this leads to the lkb being freed */
1322                 unhold_lkb(lkb);
1323                 rv = -1;
1324                 break;
1325         default:
1326                 log_print("invalid status for revert %d", lkb->lkb_status);
1327         }
1328         return rv;
1329 }
1330
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         return revert_lock(r, lkb);
1334 }
1335
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                 lkb->lkb_grmode = lkb->lkb_rqmode;
1340                 if (lkb->lkb_status)
1341                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                 else
1343                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344         }
1345
1346         lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         set_lvb_lock(r, lkb);
1352         _grant_lock(r, lkb);
1353         lkb->lkb_highbast = 0;
1354 }
1355
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                           struct dlm_message *ms)
1358 {
1359         set_lvb_lock_pc(r, lkb, ms);
1360         _grant_lock(r, lkb);
1361 }
1362
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369         grant_lock(r, lkb);
1370         if (is_master_copy(lkb))
1371                 send_grant(r, lkb);
1372         else
1373                 queue_cast(r, lkb, 0);
1374 }
1375
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                 log_print("munge_demoted %x invalid reply type %d",
1388                           lkb->lkb_id, ms->m_type);
1389                 return;
1390         }
1391
1392         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                 return;
1396         }
1397
1398         lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404             ms->m_type != DLM_MSG_GRANT) {
1405                 log_print("munge_altmode %x invalid reply type %d",
1406                           lkb->lkb_id, ms->m_type);
1407                 return;
1408         }
1409
1410         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                 lkb->lkb_rqmode = DLM_LOCK_PR;
1412         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                 lkb->lkb_rqmode = DLM_LOCK_CW;
1414         else {
1415                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                 dlm_print_lkb(lkb);
1417         }
1418 }
1419
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                            lkb_statequeue);
1424         if (lkb->lkb_id == first->lkb_id)
1425                 return 1;
1426
1427         return 0;
1428 }
1429
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434         struct dlm_lkb *this;
1435
1436         list_for_each_entry(this, head, lkb_statequeue) {
1437                 if (this == lkb)
1438                         continue;
1439                 if (!modes_compat(this, lkb))
1440                         return 1;
1441         }
1442         return 0;
1443 }
1444
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488         struct dlm_lkb *lkb1;
1489         int lkb_is_ahead = 0;
1490
1491         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                 if (lkb1 == lkb2) {
1493                         lkb_is_ahead = 1;
1494                         continue;
1495                 }
1496
1497                 if (!lkb_is_ahead) {
1498                         if (!modes_compat(lkb2, lkb1))
1499                                 return 1;
1500                 } else {
1501                         if (!modes_compat(lkb2, lkb1) &&
1502                             !modes_compat(lkb1, lkb2))
1503                                 return 1;
1504                 }
1505         }
1506         return 0;
1507 }
1508
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526         /*
1527          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528          * a new request for a NL mode lock being blocked.
1529          *
1530          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531          * request, then it would be granted.  In essence, the use of this flag
1532          * tells the Lock Manager to expedite theis request by not considering
1533          * what may be in the CONVERTING or WAITING queues...  As of this
1534          * writing, the EXPEDITE flag can be used only with new requests for NL
1535          * mode locks.  This flag is not valid for conversion requests.
1536          *
1537          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538          * conversion or used with a non-NL requested mode.  We also know an
1539          * EXPEDITE request is always granted immediately, so now must always
1540          * be 1.  The full condition to grant an expedite request: (now &&
1541          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542          * therefore be shortened to just checking the flag.
1543          */
1544
1545         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                 return 1;
1547
1548         /*
1549          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550          * added to the remaining conditions.
1551          */
1552
1553         if (queue_conflict(&r->res_grantqueue, lkb))
1554                 goto out;
1555
1556         /*
1557          * 6-3: By default, a conversion request is immediately granted if the
1558          * requested mode is compatible with the modes of all other granted
1559          * locks
1560          */
1561
1562         if (queue_conflict(&r->res_convertqueue, lkb))
1563                 goto out;
1564
1565         /*
1566          * 6-5: But the default algorithm for deciding whether to grant or
1567          * queue conversion requests does not by itself guarantee that such
1568          * requests are serviced on a "first come first serve" basis.  This, in
1569          * turn, can lead to a phenomenon known as "indefinate postponement".
1570          *
1571          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572          * the system service employed to request a lock conversion.  This flag
1573          * forces certain conversion requests to be queued, even if they are
1574          * compatible with the granted modes of other locks on the same
1575          * resource.  Thus, the use of this flag results in conversion requests
1576          * being ordered on a "first come first servce" basis.
1577          *
1578          * DCT: This condition is all about new conversions being able to occur
1579          * "in place" while the lock remains on the granted queue (assuming
1580          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581          * doesn't _have_ to go onto the convert queue where it's processed in
1582          * order.  The "now" variable is necessary to distinguish converts
1583          * being received and processed for the first time now, because once a
1584          * convert is moved to the conversion queue the condition below applies
1585          * requiring fifo granting.
1586          */
1587
1588         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                 return 1;
1590
1591         /*
1592          * The NOORDER flag is set to avoid the standard vms rules on grant
1593          * order.
1594          */
1595
1596         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                 return 1;
1598
1599         /*
1600          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601          * granted until all other conversion requests ahead of it are granted
1602          * and/or canceled.
1603          */
1604
1605         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                 return 1;
1607
1608         /*
1609          * 6-4: By default, a new request is immediately granted only if all
1610          * three of the following conditions are satisfied when the request is
1611          * issued:
1612          * - The queue of ungranted conversion requests for the resource is
1613          *   empty.
1614          * - The queue of ungranted new requests for the resource is empty.
1615          * - The mode of the new request is compatible with the most
1616          *   restrictive mode of all granted locks on the resource.
1617          */
1618
1619         if (now && !conv && list_empty(&r->res_convertqueue) &&
1620             list_empty(&r->res_waitqueue))
1621                 return 1;
1622
1623         /*
1624          * 6-4: Once a lock request is in the queue of ungranted new requests,
1625          * it cannot be granted until the queue of ungranted conversion
1626          * requests is empty, all ungranted new requests ahead of it are
1627          * granted and/or canceled, and it is compatible with the granted mode
1628          * of the most restrictive lock granted on the resource.
1629          */
1630
1631         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632             first_in_list(lkb, &r->res_waitqueue))
1633                 return 1;
1634  out:
1635         return 0;
1636 }
1637
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                           int *err)
1640 {
1641         int rv;
1642         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645         if (err)
1646                 *err = 0;
1647
1648         rv = _can_be_granted(r, lkb, now);
1649         if (rv)
1650                 goto out;
1651
1652         /*
1653          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655          * cancels one of the locks.
1656          */
1657
1658         if (is_convert && can_be_queued(lkb) &&
1659             conversion_deadlock_detect(r, lkb)) {
1660                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                         lkb->lkb_grmode = DLM_LOCK_NL;
1662                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                         if (err)
1665                                 *err = -EDEADLK;
1666                         else {
1667                                 log_print("can_be_granted deadlock %x now %d",
1668                                           lkb->lkb_id, now);
1669                                 dlm_dump_rsb(r);
1670                         }
1671                 }
1672                 goto out;
1673         }
1674
1675         /*
1676          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677          * to grant a request in a mode other than the normal rqmode.  It's a
1678          * simple way to provide a big optimization to applications that can
1679          * use them.
1680          */
1681
1682         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                 alt = DLM_LOCK_PR;
1684         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                 alt = DLM_LOCK_CW;
1686
1687         if (alt) {
1688                 lkb->lkb_rqmode = alt;
1689                 rv = _can_be_granted(r, lkb, now);
1690                 if (rv)
1691                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                 else
1693                         lkb->lkb_rqmode = rqmode;
1694         }
1695  out:
1696         return rv;
1697 }
1698
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711         struct dlm_lkb *lkb, *s;
1712         int hi, demoted, quit, grant_restart, demote_restart;
1713         int deadlk;
1714
1715         quit = 0;
1716  restart:
1717         grant_restart = 0;
1718         demote_restart = 0;
1719         hi = DLM_LOCK_IV;
1720
1721         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                 demoted = is_demoted(lkb);
1723                 deadlk = 0;
1724
1725                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                         grant_lock_pending(r, lkb);
1727                         grant_restart = 1;
1728                         continue;
1729                 }
1730
1731                 if (!demoted && is_demoted(lkb)) {
1732                         log_print("WARN: pending demoted %x node %d %s",
1733                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                         demote_restart = 1;
1735                         continue;
1736                 }
1737
1738                 if (deadlk) {
1739                         log_print("WARN: pending deadlock %x node %d %s",
1740                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                         dlm_dump_rsb(r);
1742                         continue;
1743                 }
1744
1745                 hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                         *cw = 1;
1749         }
1750
1751         if (grant_restart)
1752                 goto restart;
1753         if (demote_restart && !quit) {
1754                 quit = 1;
1755                 goto restart;
1756         }
1757
1758         return max_t(int, high, hi);
1759 }
1760
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763         struct dlm_lkb *lkb, *s;
1764
1765         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                 if (can_be_granted(r, lkb, 0, NULL))
1767                         grant_lock_pending(r, lkb);
1768                 else {
1769                         high = max_t(int, lkb->lkb_rqmode, high);
1770                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                 *cw = 1;
1772                 }
1773         }
1774
1775         return high;
1776 }
1777
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                 if (gr->lkb_highbast < DLM_LOCK_EX)
1787                         return 1;
1788                 return 0;
1789         }
1790
1791         if (gr->lkb_highbast < high &&
1792             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                 return 1;
1794         return 0;
1795 }
1796
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799         struct dlm_lkb *lkb, *s;
1800         int high = DLM_LOCK_IV;
1801         int cw = 0;
1802
1803         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805         high = grant_pending_convert(r, high, &cw);
1806         high = grant_pending_wait(r, high, &cw);
1807
1808         if (high == DLM_LOCK_IV)
1809                 return;
1810
1811         /*
1812          * If there are locks left on the wait/convert queue then send blocking
1813          * ASTs to granted locks based on the largest requested mode (high)
1814          * found above.
1815          */
1816
1817         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                         if (cw && high == DLM_LOCK_PR &&
1820                             lkb->lkb_grmode == DLM_LOCK_PR)
1821                                 queue_bast(r, lkb, DLM_LOCK_CW);
1822                         else
1823                                 queue_bast(r, lkb, high);
1824                         lkb->lkb_highbast = high;
1825                 }
1826         }
1827 }
1828
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                 if (gr->lkb_highbast < DLM_LOCK_EX)
1834                         return 1;
1835                 return 0;
1836         }
1837
1838         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                 return 1;
1840         return 0;
1841 }
1842
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                             struct dlm_lkb *lkb)
1845 {
1846         struct dlm_lkb *gr;
1847
1848         list_for_each_entry(gr, head, lkb_statequeue) {
1849                 /* skip self when sending basts to convertqueue */
1850                 if (gr == lkb)
1851                         continue;
1852                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1853                         queue_bast(r, gr, lkb->lkb_rqmode);
1854                         gr->lkb_highbast = lkb->lkb_rqmode;
1855                 }
1856         }
1857 }
1858
1859 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1860 {
1861         send_bast_queue(r, &r->res_grantqueue, lkb);
1862 }
1863
1864 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1865 {
1866         send_bast_queue(r, &r->res_grantqueue, lkb);
1867         send_bast_queue(r, &r->res_convertqueue, lkb);
1868 }
1869
1870 /* set_master(r, lkb) -- set the master nodeid of a resource
1871
1872    The purpose of this function is to set the nodeid field in the given
1873    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1874    known, it can just be copied to the lkb and the function will return
1875    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1876    before it can be copied to the lkb.
1877
1878    When the rsb nodeid is being looked up remotely, the initial lkb
1879    causing the lookup is kept on the ls_waiters list waiting for the
1880    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1881    on the rsb's res_lookup list until the master is verified.
1882
1883    Return values:
1884    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1885    1: the rsb master is not available and the lkb has been placed on
1886       a wait queue
1887 */
1888
1889 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1890 {
1891         struct dlm_ls *ls = r->res_ls;
1892         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1893
1894         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1895                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1896                 r->res_first_lkid = lkb->lkb_id;
1897                 lkb->lkb_nodeid = r->res_nodeid;
1898                 return 0;
1899         }
1900
1901         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1902                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1903                 return 1;
1904         }
1905
1906         if (r->res_nodeid == 0) {
1907                 lkb->lkb_nodeid = 0;
1908                 return 0;
1909         }
1910
1911         if (r->res_nodeid > 0) {
1912                 lkb->lkb_nodeid = r->res_nodeid;
1913                 return 0;
1914         }
1915
1916         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1917
1918         dir_nodeid = dlm_dir_nodeid(r);
1919
1920         if (dir_nodeid != our_nodeid) {
1921                 r->res_first_lkid = lkb->lkb_id;
1922                 send_lookup(r, lkb);
1923                 return 1;
1924         }
1925
1926         for (i = 0; i < 2; i++) {
1927                 /* It's possible for dlm_scand to remove an old rsb for
1928                    this same resource from the toss list, us to create
1929                    a new one, look up the master locally, and find it
1930                    already exists just before dlm_scand does the
1931                    dir_remove() on the previous rsb. */
1932
1933                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1934                                        r->res_length, &ret_nodeid);
1935                 if (!error)
1936                         break;
1937                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1938                 schedule();
1939         }
1940         if (error && error != -EEXIST)
1941                 return error;
1942
1943         if (ret_nodeid == our_nodeid) {
1944                 r->res_first_lkid = 0;
1945                 r->res_nodeid = 0;
1946                 lkb->lkb_nodeid = 0;
1947         } else {
1948                 r->res_first_lkid = lkb->lkb_id;
1949                 r->res_nodeid = ret_nodeid;
1950                 lkb->lkb_nodeid = ret_nodeid;
1951         }
1952         return 0;
1953 }
1954
1955 static void process_lookup_list(struct dlm_rsb *r)
1956 {
1957         struct dlm_lkb *lkb, *safe;
1958
1959         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1960                 list_del_init(&lkb->lkb_rsb_lookup);
1961                 _request_lock(r, lkb);
1962                 schedule();
1963         }
1964 }
1965
1966 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1967
1968 static void confirm_master(struct dlm_rsb *r, int error)
1969 {
1970         struct dlm_lkb *lkb;
1971
1972         if (!r->res_first_lkid)
1973                 return;
1974
1975         switch (error) {
1976         case 0:
1977         case -EINPROGRESS:
1978                 r->res_first_lkid = 0;
1979                 process_lookup_list(r);
1980                 break;
1981
1982         case -EAGAIN:
1983         case -EBADR:
1984         case -ENOTBLK:
1985                 /* the remote request failed and won't be retried (it was
1986                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1987                    lkb the first_lkid */
1988
1989                 r->res_first_lkid = 0;
1990
1991                 if (!list_empty(&r->res_lookup)) {
1992                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1993                                          lkb_rsb_lookup);
1994                         list_del_init(&lkb->lkb_rsb_lookup);
1995                         r->res_first_lkid = lkb->lkb_id;
1996                         _request_lock(r, lkb);
1997                 }
1998                 break;
1999
2000         default:
2001                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2002         }
2003 }
2004
2005 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2006                          int namelen, unsigned long timeout_cs,
2007                          void (*ast) (void *astparam),
2008                          void *astparam,
2009                          void (*bast) (void *astparam, int mode),
2010                          struct dlm_args *args)
2011 {
2012         int rv = -EINVAL;
2013
2014         /* check for invalid arg usage */
2015
2016         if (mode < 0 || mode > DLM_LOCK_EX)
2017                 goto out;
2018
2019         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2020                 goto out;
2021
2022         if (flags & DLM_LKF_CANCEL)
2023                 goto out;
2024
2025         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2026                 goto out;
2027
2028         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2029                 goto out;
2030
2031         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2032                 goto out;
2033
2034         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2035                 goto out;
2036
2037         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2038                 goto out;
2039
2040         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2041                 goto out;
2042
2043         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2044                 goto out;
2045
2046         if (!ast || !lksb)
2047                 goto out;
2048
2049         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2050                 goto out;
2051
2052         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2053                 goto out;
2054
2055         /* these args will be copied to the lkb in validate_lock_args,
2056            it cannot be done now because when converting locks, fields in
2057            an active lkb cannot be modified before locking the rsb */
2058
2059         args->flags = flags;
2060         args->astfn = ast;
2061         args->astparam = astparam;
2062         args->bastfn = bast;
2063         args->timeout = timeout_cs;
2064         args->mode = mode;
2065         args->lksb = lksb;
2066         rv = 0;
2067  out:
2068         return rv;
2069 }
2070
2071 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2072 {
2073         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2074                       DLM_LKF_FORCEUNLOCK))
2075                 return -EINVAL;
2076
2077         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2078                 return -EINVAL;
2079
2080         args->flags = flags;
2081         args->astparam = astarg;
2082         return 0;
2083 }
2084
2085 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2086                               struct dlm_args *args)
2087 {
2088         int rv = -EINVAL;
2089
2090         if (args->flags & DLM_LKF_CONVERT) {
2091                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2092                         goto out;
2093
2094                 if (args->flags & DLM_LKF_QUECVT &&
2095                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2096                         goto out;
2097
2098                 rv = -EBUSY;
2099                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2100                         goto out;
2101
2102                 if (lkb->lkb_wait_type)
2103                         goto out;
2104
2105                 if (is_overlap(lkb))
2106                         goto out;
2107         }
2108
2109         lkb->lkb_exflags = args->flags;
2110         lkb->lkb_sbflags = 0;
2111         lkb->lkb_astfn = args->astfn;
2112         lkb->lkb_astparam = args->astparam;
2113         lkb->lkb_bastfn = args->bastfn;
2114         lkb->lkb_rqmode = args->mode;
2115         lkb->lkb_lksb = args->lksb;
2116         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2117         lkb->lkb_ownpid = (int) current->pid;
2118         lkb->lkb_timeout_cs = args->timeout;
2119         rv = 0;
2120  out:
2121         if (rv)
2122                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2123                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2124                           lkb->lkb_status, lkb->lkb_wait_type,
2125                           lkb->lkb_resource->res_name);
2126         return rv;
2127 }
2128
2129 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2130    for success */
2131
2132 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2133    because there may be a lookup in progress and it's valid to do
2134    cancel/unlockf on it */
2135
2136 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2137 {
2138         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2139         int rv = -EINVAL;
2140
2141         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2142                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2143                 dlm_print_lkb(lkb);
2144                 goto out;
2145         }
2146
2147         /* an lkb may still exist even though the lock is EOL'ed due to a
2148            cancel, unlock or failed noqueue request; an app can't use these
2149            locks; return same error as if the lkid had not been found at all */
2150
2151         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2152                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2153                 rv = -ENOENT;
2154                 goto out;
2155         }
2156
2157         /* an lkb may be waiting for an rsb lookup to complete where the
2158            lookup was initiated by another lock */
2159
2160         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2161                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2162                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2163                         list_del_init(&lkb->lkb_rsb_lookup);
2164                         queue_cast(lkb->lkb_resource, lkb,
2165                                    args->flags & DLM_LKF_CANCEL ?
2166                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2167                         unhold_lkb(lkb); /* undoes create_lkb() */
2168                 }
2169                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2170                 rv = -EBUSY;
2171                 goto out;
2172         }
2173
2174         /* cancel not allowed with another cancel/unlock in progress */
2175
2176         if (args->flags & DLM_LKF_CANCEL) {
2177                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2178                         goto out;
2179
2180                 if (is_overlap(lkb))
2181                         goto out;
2182
2183                 /* don't let scand try to do a cancel */
2184                 del_timeout(lkb);
2185
2186                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2187                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2188                         rv = -EBUSY;
2189                         goto out;
2190                 }
2191
2192                 /* there's nothing to cancel */
2193                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2194                     !lkb->lkb_wait_type) {
2195                         rv = -EBUSY;
2196                         goto out;
2197                 }
2198
2199                 switch (lkb->lkb_wait_type) {
2200                 case DLM_MSG_LOOKUP:
2201                 case DLM_MSG_REQUEST:
2202                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2203                         rv = -EBUSY;
2204                         goto out;
2205                 case DLM_MSG_UNLOCK:
2206                 case DLM_MSG_CANCEL:
2207                         goto out;
2208                 }
2209                 /* add_to_waiters() will set OVERLAP_CANCEL */
2210                 goto out_ok;
2211         }
2212
2213         /* do we need to allow a force-unlock if there's a normal unlock
2214            already in progress?  in what conditions could the normal unlock
2215            fail such that we'd want to send a force-unlock to be sure? */
2216
2217         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2218                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2219                         goto out;
2220
2221                 if (is_overlap_unlock(lkb))
2222                         goto out;
2223
2224                 /* don't let scand try to do a cancel */
2225                 del_timeout(lkb);
2226
2227                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2228                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2229                         rv = -EBUSY;
2230                         goto out;
2231                 }
2232
2233                 switch (lkb->lkb_wait_type) {
2234                 case DLM_MSG_LOOKUP:
2235                 case DLM_MSG_REQUEST:
2236                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2237                         rv = -EBUSY;
2238                         goto out;
2239                 case DLM_MSG_UNLOCK:
2240                         goto out;
2241                 }
2242                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2243                 goto out_ok;
2244         }
2245
2246         /* normal unlock not allowed if there's any op in progress */
2247         rv = -EBUSY;
2248         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2249                 goto out;
2250
2251  out_ok:
2252         /* an overlapping op shouldn't blow away exflags from other op */
2253         lkb->lkb_exflags |= args->flags;
2254         lkb->lkb_sbflags = 0;
2255         lkb->lkb_astparam = args->astparam;
2256         rv = 0;
2257  out:
2258         if (rv)
2259                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2260                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2261                           args->flags, lkb->lkb_wait_type,
2262                           lkb->lkb_resource->res_name);
2263         return rv;
2264 }
2265
2266 /*
2267  * Four stage 4 varieties:
2268  * do_request(), do_convert(), do_unlock(), do_cancel()
2269  * These are called on the master node for the given lock and
2270  * from the central locking logic.
2271  */
2272
2273 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2274 {
2275         int error = 0;
2276
2277         if (can_be_granted(r, lkb, 1, NULL)) {
2278                 grant_lock(r, lkb);
2279                 queue_cast(r, lkb, 0);
2280                 goto out;
2281         }
2282
2283         if (can_be_queued(lkb)) {
2284                 error = -EINPROGRESS;
2285                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2286                 add_timeout(lkb);
2287                 goto out;
2288         }
2289
2290         error = -EAGAIN;
2291         queue_cast(r, lkb, -EAGAIN);
2292  out:
2293         return error;
2294 }
2295
2296 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2297                                int error)
2298 {
2299         switch (error) {
2300         case -EAGAIN:
2301                 if (force_blocking_asts(lkb))
2302                         send_blocking_asts_all(r, lkb);
2303                 break;
2304         case -EINPROGRESS:
2305                 send_blocking_asts(r, lkb);
2306                 break;
2307         }
2308 }
2309
2310 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2311 {
2312         int error = 0;
2313         int deadlk = 0;
2314
2315         /* changing an existing lock may allow others to be granted */
2316
2317         if (can_be_granted(r, lkb, 1, &deadlk)) {
2318                 grant_lock(r, lkb);
2319                 queue_cast(r, lkb, 0);
2320                 goto out;
2321         }
2322
2323         /* can_be_granted() detected that this lock would block in a conversion
2324            deadlock, so we leave it on the granted queue and return EDEADLK in
2325            the ast for the convert. */
2326
2327         if (deadlk) {
2328                 /* it's left on the granted queue */
2329                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2330                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2331                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2332                 revert_lock(r, lkb);
2333                 queue_cast(r, lkb, -EDEADLK);
2334                 error = -EDEADLK;
2335                 goto out;
2336         }
2337
2338         /* is_demoted() means the can_be_granted() above set the grmode
2339            to NL, and left us on the granted queue.  This auto-demotion
2340            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2341            now grantable.  We have to try to grant other converting locks
2342            before we try again to grant this one. */
2343
2344         if (is_demoted(lkb)) {
2345                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2346                 if (_can_be_granted(r, lkb, 1)) {
2347                         grant_lock(r, lkb);
2348                         queue_cast(r, lkb, 0);
2349                         goto out;
2350                 }
2351                 /* else fall through and move to convert queue */
2352         }
2353
2354         if (can_be_queued(lkb)) {
2355                 error = -EINPROGRESS;
2356                 del_lkb(r, lkb);
2357                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2358                 add_timeout(lkb);
2359                 goto out;
2360         }
2361
2362         error = -EAGAIN;
2363         queue_cast(r, lkb, -EAGAIN);
2364  out:
2365         return error;
2366 }
2367
2368 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2369                                int error)
2370 {
2371         switch (error) {
2372         case 0:
2373                 grant_pending_locks(r);
2374                 /* grant_pending_locks also sends basts */
2375                 break;
2376         case -EAGAIN:
2377                 if (force_blocking_asts(lkb))
2378                         send_blocking_asts_all(r, lkb);
2379                 break;
2380         case -EINPROGRESS:
2381                 send_blocking_asts(r, lkb);
2382                 break;
2383         }
2384 }
2385
2386 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2387 {
2388         remove_lock(r, lkb);
2389         queue_cast(r, lkb, -DLM_EUNLOCK);
2390         return -DLM_EUNLOCK;
2391 }
2392
2393 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2394                               int error)
2395 {
2396         grant_pending_locks(r);
2397 }
2398
2399 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2400  
2401 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2402 {
2403         int error;
2404
2405         error = revert_lock(r, lkb);
2406         if (error) {
2407                 queue_cast(r, lkb, -DLM_ECANCEL);
2408                 return -DLM_ECANCEL;
2409         }
2410         return 0;
2411 }
2412
2413 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2414                               int error)
2415 {
2416         if (error)
2417                 grant_pending_locks(r);
2418 }
2419
2420 /*
2421  * Four stage 3 varieties:
2422  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2423  */
2424
2425 /* add a new lkb to a possibly new rsb, called by requesting process */
2426
2427 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2428 {
2429         int error;
2430
2431         /* set_master: sets lkb nodeid from r */
2432
2433         error = set_master(r, lkb);
2434         if (error < 0)
2435                 goto out;
2436         if (error) {
2437                 error = 0;
2438                 goto out;
2439         }
2440
2441         if (is_remote(r)) {
2442                 /* receive_request() calls do_request() on remote node */
2443                 error = send_request(r, lkb);
2444         } else {
2445                 error = do_request(r, lkb);
2446                 /* for remote locks the request_reply is sent
2447                    between do_request and do_request_effects */
2448                 do_request_effects(r, lkb, error);
2449         }
2450  out:
2451         return error;
2452 }
2453
2454 /* change some property of an existing lkb, e.g. mode */
2455
2456 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2457 {
2458         int error;
2459
2460         if (is_remote(r)) {
2461                 /* receive_convert() calls do_convert() on remote node */
2462                 error = send_convert(r, lkb);
2463         } else {
2464                 error = do_convert(r, lkb);
2465                 /* for remote locks the convert_reply is sent
2466                    between do_convert and do_convert_effects */
2467                 do_convert_effects(r, lkb, error);
2468         }
2469
2470         return error;
2471 }
2472
2473 /* remove an existing lkb from the granted queue */
2474
2475 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2476 {
2477         int error;
2478
2479         if (is_remote(r)) {
2480                 /* receive_unlock() calls do_unlock() on remote node */
2481                 error = send_unlock(r, lkb);
2482         } else {
2483                 error = do_unlock(r, lkb);
2484                 /* for remote locks the unlock_reply is sent
2485                    between do_unlock and do_unlock_effects */
2486                 do_unlock_effects(r, lkb, error);
2487         }
2488
2489         return error;
2490 }
2491
2492 /* remove an existing lkb from the convert or wait queue */
2493
2494 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2495 {
2496         int error;
2497
2498         if (is_remote(r)) {
2499                 /* receive_cancel() calls do_cancel() on remote node */
2500                 error = send_cancel(r, lkb);
2501         } else {
2502                 error = do_cancel(r, lkb);
2503                 /* for remote locks the cancel_reply is sent
2504                    between do_cancel and do_cancel_effects */
2505                 do_cancel_effects(r, lkb, error);
2506         }
2507
2508         return error;
2509 }
2510
2511 /*
2512  * Four stage 2 varieties:
2513  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2514  */
2515
2516 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2517                         int len, struct dlm_args *args)
2518 {
2519         struct dlm_rsb *r;
2520         int error;
2521
2522         error = validate_lock_args(ls, lkb, args);
2523         if (error)
2524                 goto out;
2525
2526         error = find_rsb(ls, name, len, R_CREATE, &r);
2527         if (error)
2528                 goto out;
2529
2530         lock_rsb(r);
2531
2532         attach_lkb(r, lkb);
2533         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2534
2535         error = _request_lock(r, lkb);
2536
2537         unlock_rsb(r);
2538         put_rsb(r);
2539
2540  out:
2541         return error;
2542 }
2543
2544 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2545                         struct dlm_args *args)
2546 {
2547         struct dlm_rsb *r;
2548         int error;
2549
2550         r = lkb->lkb_resource;
2551
2552         hold_rsb(r);
2553         lock_rsb(r);
2554
2555         error = validate_lock_args(ls, lkb, args);
2556         if (error)
2557                 goto out;
2558
2559         error = _convert_lock(r, lkb);
2560  out:
2561         unlock_rsb(r);
2562         put_rsb(r);
2563         return error;
2564 }
2565
2566 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2567                        struct dlm_args *args)
2568 {
2569         struct dlm_rsb *r;
2570         int error;
2571
2572         r = lkb->lkb_resource;
2573
2574         hold_rsb(r);
2575         lock_rsb(r);
2576
2577         error = validate_unlock_args(lkb, args);
2578         if (error)
2579                 goto out;
2580
2581         error = _unlock_lock(r, lkb);
2582  out:
2583         unlock_rsb(r);
2584         put_rsb(r);
2585         return error;
2586 }
2587
2588 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2589                        struct dlm_args *args)
2590 {
2591         struct dlm_rsb *r;
2592         int error;
2593
2594         r = lkb->lkb_resource;
2595
2596         hold_rsb(r);
2597         lock_rsb(r);
2598
2599         error = validate_unlock_args(lkb, args);
2600         if (error)
2601                 goto out;
2602
2603         error = _cancel_lock(r, lkb);
2604  out:
2605         unlock_rsb(r);
2606         put_rsb(r);
2607         return error;
2608 }
2609
2610 /*
2611  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2612  */
2613
2614 int dlm_lock(dlm_lockspace_t *lockspace,
2615              int mode,
2616              struct dlm_lksb *lksb,
2617              uint32_t flags,
2618              void *name,
2619              unsigned int namelen,
2620              uint32_t parent_lkid,
2621              void (*ast) (void *astarg),
2622              void *astarg,
2623              void (*bast) (void *astarg, int mode))
2624 {
2625         struct dlm_ls *ls;
2626         struct dlm_lkb *lkb;
2627         struct dlm_args args;
2628         int error, convert = flags & DLM_LKF_CONVERT;
2629
2630         ls = dlm_find_lockspace_local(lockspace);
2631         if (!ls)
2632                 return -EINVAL;
2633
2634         dlm_lock_recovery(ls);
2635
2636         if (convert)
2637                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2638         else
2639                 error = create_lkb(ls, &lkb);
2640
2641         if (error)
2642                 goto out;
2643
2644         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2645                               astarg, bast, &args);
2646         if (error)
2647                 goto out_put;
2648
2649         if (convert)
2650                 error = convert_lock(ls, lkb, &args);
2651         else
2652                 error = request_lock(ls, lkb, name, namelen, &args);
2653
2654         if (error == -EINPROGRESS)
2655                 error = 0;
2656  out_put:
2657         if (convert || error)
2658                 __put_lkb(ls, lkb);
2659         if (error == -EAGAIN || error == -EDEADLK)
2660                 error = 0;
2661  out:
2662         dlm_unlock_recovery(ls);
2663         dlm_put_lockspace(ls);
2664         return error;
2665 }
2666
2667 int dlm_unlock(dlm_lockspace_t *lockspace,
2668                uint32_t lkid,
2669                uint32_t flags,
2670                struct dlm_lksb *lksb,
2671                void *astarg)
2672 {
2673         struct dlm_ls *ls;
2674         struct dlm_lkb *lkb;
2675         struct dlm_args args;
2676         int error;
2677
2678         ls = dlm_find_lockspace_local(lockspace);
2679         if (!ls)
2680                 return -EINVAL;
2681
2682         dlm_lock_recovery(ls);
2683
2684         error = find_lkb(ls, lkid, &lkb);
2685         if (error)
2686                 goto out;
2687
2688         error = set_unlock_args(flags, astarg, &args);
2689         if (error)
2690                 goto out_put;
2691
2692         if (flags & DLM_LKF_CANCEL)
2693                 error = cancel_lock(ls, lkb, &args);
2694         else
2695                 error = unlock_lock(ls, lkb, &args);
2696
2697         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2698                 error = 0;
2699         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2700                 error = 0;
2701  out_put:
2702         dlm_put_lkb(lkb);
2703  out:
2704         dlm_unlock_recovery(ls);
2705         dlm_put_lockspace(ls);
2706         return error;
2707 }
2708
2709 /*
2710  * send/receive routines for remote operations and replies
2711  *
2712  * send_args
2713  * send_common
2714  * send_request                 receive_request
2715  * send_convert                 receive_convert
2716  * send_unlock                  receive_unlock
2717  * send_cancel                  receive_cancel
2718  * send_grant                   receive_grant
2719  * send_bast                    receive_bast
2720  * send_lookup                  receive_lookup
2721  * send_remove                  receive_remove
2722  *
2723  *                              send_common_reply
2724  * receive_request_reply        send_request_reply
2725  * receive_convert_reply        send_convert_reply
2726  * receive_unlock_reply         send_unlock_reply
2727  * receive_cancel_reply         send_cancel_reply
2728  * receive_lookup_reply         send_lookup_reply
2729  */
2730
2731 static int _create_message(struct dlm_ls *ls, int mb_len,
2732                            int to_nodeid, int mstype,
2733                            struct dlm_message **ms_ret,
2734                            struct dlm_mhandle **mh_ret)
2735 {
2736         struct dlm_message *ms;
2737         struct dlm_mhandle *mh;
2738         char *mb;
2739
2740         /* get_buffer gives us a message handle (mh) that we need to
2741            pass into lowcomms_commit and a message buffer (mb) that we
2742            write our data into */
2743
2744         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2745         if (!mh)
2746                 return -ENOBUFS;
2747
2748         memset(mb, 0, mb_len);
2749
2750         ms = (struct dlm_message *) mb;
2751
2752         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2753         ms->m_header.h_lockspace = ls->ls_global_id;
2754         ms->m_header.h_nodeid = dlm_our_nodeid();
2755         ms->m_header.h_length = mb_len;
2756         ms->m_header.h_cmd = DLM_MSG;
2757
2758         ms->m_type = mstype;
2759
2760         *mh_ret = mh;
2761         *ms_ret = ms;
2762         return 0;
2763 }
2764
2765 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2766                           int to_nodeid, int mstype,
2767                           struct dlm_message **ms_ret,
2768                           struct dlm_mhandle **mh_ret)
2769 {
2770         int mb_len = sizeof(struct dlm_message);
2771
2772         switch (mstype) {
2773         case DLM_MSG_REQUEST:
2774         case DLM_MSG_LOOKUP:
2775         case DLM_MSG_REMOVE:
2776                 mb_len += r->res_length;
2777                 break;
2778         case DLM_MSG_CONVERT:
2779         case DLM_MSG_UNLOCK:
2780         case DLM_MSG_REQUEST_REPLY:
2781         case DLM_MSG_CONVERT_REPLY:
2782         case DLM_MSG_GRANT:
2783                 if (lkb && lkb->lkb_lvbptr)
2784                         mb_len += r->res_ls->ls_lvblen;
2785                 break;
2786         }
2787
2788         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2789                                ms_ret, mh_ret);
2790 }
2791
2792 /* further lowcomms enhancements or alternate implementations may make
2793    the return value from this function useful at some point */
2794
2795 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2796 {
2797         dlm_message_out(ms);
2798         dlm_lowcomms_commit_buffer(mh);
2799         return 0;
2800 }
2801
2802 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2803                       struct dlm_message *ms)
2804 {
2805         ms->m_nodeid   = lkb->lkb_nodeid;
2806         ms->m_pid      = lkb->lkb_ownpid;
2807         ms->m_lkid     = lkb->lkb_id;
2808         ms->m_remid    = lkb->lkb_remid;
2809         ms->m_exflags  = lkb->lkb_exflags;
2810         ms->m_sbflags  = lkb->lkb_sbflags;
2811         ms->m_flags    = lkb->lkb_flags;
2812         ms->m_lvbseq   = lkb->lkb_lvbseq;
2813         ms->m_status   = lkb->lkb_status;
2814         ms->m_grmode   = lkb->lkb_grmode;
2815         ms->m_rqmode   = lkb->lkb_rqmode;
2816         ms->m_hash     = r->res_hash;
2817
2818         /* m_result and m_bastmode are set from function args,
2819            not from lkb fields */
2820
2821         if (lkb->lkb_bastfn)
2822                 ms->m_asts |= AST_BAST;
2823         if (lkb->lkb_astfn)
2824                 ms->m_asts |= AST_COMP;
2825
2826         /* compare with switch in create_message; send_remove() doesn't
2827            use send_args() */
2828
2829         switch (ms->m_type) {
2830         case DLM_MSG_REQUEST:
2831         case DLM_MSG_LOOKUP:
2832                 memcpy(ms->m_extra, r->res_name, r->res_length);
2833                 break;
2834         case DLM_MSG_CONVERT:
2835         case DLM_MSG_UNLOCK:
2836         case DLM_MSG_REQUEST_REPLY:
2837         case DLM_MSG_CONVERT_REPLY:
2838         case DLM_MSG_GRANT:
2839                 if (!lkb->lkb_lvbptr)
2840                         break;
2841                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2842                 break;
2843         }
2844 }
2845
2846 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2847 {
2848         struct dlm_message *ms;
2849         struct dlm_mhandle *mh;
2850         int to_nodeid, error;
2851
2852         error = add_to_waiters(lkb, mstype);
2853         if (error)
2854                 return error;
2855
2856         to_nodeid = r->res_nodeid;
2857
2858         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2859         if (error)
2860                 goto fail;
2861
2862         send_args(r, lkb, ms);
2863
2864         error = send_message(mh, ms);
2865         if (error)
2866                 goto fail;
2867         return 0;
2868
2869  fail:
2870         remove_from_waiters(lkb, msg_reply_type(mstype));
2871         return error;
2872 }
2873
2874 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2875 {
2876         return send_common(r, lkb, DLM_MSG_REQUEST);
2877 }
2878
2879 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2880 {
2881         int error;
2882
2883         error = send_common(r, lkb, DLM_MSG_CONVERT);
2884
2885         /* down conversions go without a reply from the master */
2886         if (!error && down_conversion(lkb)) {
2887                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2888                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2889                 r->res_ls->ls_stub_ms.m_result = 0;
2890                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2891                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2892         }
2893
2894         return error;
2895 }
2896
2897 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2898    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2899    that the master is still correct. */
2900
2901 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902 {
2903         return send_common(r, lkb, DLM_MSG_UNLOCK);
2904 }
2905
2906 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2907 {
2908         return send_common(r, lkb, DLM_MSG_CANCEL);
2909 }
2910
2911 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2912 {
2913         struct dlm_message *ms;
2914         struct dlm_mhandle *mh;
2915         int to_nodeid, error;
2916
2917         to_nodeid = lkb->lkb_nodeid;
2918
2919         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2920         if (error)
2921                 goto out;
2922
2923         send_args(r, lkb, ms);
2924
2925         ms->m_result = 0;
2926
2927         error = send_message(mh, ms);
2928  out:
2929         return error;
2930 }
2931
2932 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2933 {
2934         struct dlm_message *ms;
2935         struct dlm_mhandle *mh;
2936         int to_nodeid, error;
2937
2938         to_nodeid = lkb->lkb_nodeid;
2939
2940         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2941         if (error)
2942                 goto out;
2943
2944         send_args(r, lkb, ms);
2945
2946         ms->m_bastmode = mode;
2947
2948         error = send_message(mh, ms);
2949  out:
2950         return error;
2951 }
2952
2953 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2954 {
2955         struct dlm_message *ms;
2956         struct dlm_mhandle *mh;
2957         int to_nodeid, error;
2958
2959         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2960         if (error)
2961                 return error;
2962
2963         to_nodeid = dlm_dir_nodeid(r);
2964
2965         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2966         if (error)
2967                 goto fail;
2968
2969         send_args(r, lkb, ms);
2970
2971         error = send_message(mh, ms);
2972         if (error)
2973                 goto fail;
2974         return 0;
2975
2976  fail:
2977         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2978         return error;
2979 }
2980
2981 static int send_remove(struct dlm_rsb *r)
2982 {
2983         struct dlm_message *ms;
2984         struct dlm_mhandle *mh;
2985         int to_nodeid, error;
2986
2987         to_nodeid = dlm_dir_nodeid(r);
2988
2989         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2990         if (error)
2991                 goto out;
2992
2993         memcpy(ms->m_extra, r->res_name, r->res_length);
2994         ms->m_hash = r->res_hash;
2995
2996         error = send_message(mh, ms);
2997  out:
2998         return error;
2999 }
3000
3001 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3002                              int mstype, int rv)
3003 {
3004         struct dlm_message *ms;
3005         struct dlm_mhandle *mh;
3006         int to_nodeid, error;
3007
3008         to_nodeid = lkb->lkb_nodeid;
3009
3010         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3011         if (error)
3012                 goto out;
3013
3014         send_args(r, lkb, ms);
3015
3016         ms->m_result = rv;
3017
3018         error = send_message(mh, ms);
3019  out:
3020         return error;
3021 }
3022
3023 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3024 {
3025         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3026 }
3027
3028 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3029 {
3030         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3031 }
3032
3033 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3034 {
3035         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3036 }
3037
3038 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3039 {
3040         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3041 }
3042
3043 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3044                              int ret_nodeid, int rv)
3045 {
3046         struct dlm_rsb *r = &ls->ls_stub_rsb;
3047         struct dlm_message *ms;
3048         struct dlm_mhandle *mh;
3049         int error, nodeid = ms_in->m_header.h_nodeid;
3050
3051         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3052         if (error)
3053                 goto out;
3054
3055         ms->m_lkid = ms_in->m_lkid;
3056         ms->m_result = rv;
3057         ms->m_nodeid = ret_nodeid;
3058
3059         error = send_message(mh, ms);
3060  out:
3061         return error;
3062 }
3063
3064 /* which args we save from a received message depends heavily on the type
3065    of message, unlike the send side where we can safely send everything about
3066    the lkb for any type of message */
3067
3068 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3069 {
3070         lkb->lkb_exflags = ms->m_exflags;
3071         lkb->lkb_sbflags = ms->m_sbflags;
3072         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3073                          (ms->m_flags & 0x0000FFFF);
3074 }
3075
3076 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3077 {
3078         lkb->lkb_sbflags = ms->m_sbflags;
3079         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3080                          (ms->m_flags & 0x0000FFFF);
3081 }
3082
3083 static int receive_extralen(struct dlm_message *ms)
3084 {
3085         return (ms->m_header.h_length - sizeof(struct dlm_message));
3086 }
3087
3088 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3089                        struct dlm_message *ms)
3090 {
3091         int len;
3092
3093         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3094                 if (!lkb->lkb_lvbptr)
3095                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3096                 if (!lkb->lkb_lvbptr)
3097                         return -ENOMEM;
3098                 len = receive_extralen(ms);
3099                 if (len > DLM_RESNAME_MAXLEN)
3100                         len = DLM_RESNAME_MAXLEN;
3101                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3102         }
3103         return 0;
3104 }
3105
3106 static void fake_bastfn(void *astparam, int mode)
3107 {
3108         log_print("fake_bastfn should not be called");
3109 }
3110
3111 static void fake_astfn(void *astparam)
3112 {
3113         log_print("fake_astfn should not be called");
3114 }
3115
3116 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3117                                 struct dlm_message *ms)
3118 {
3119         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3120         lkb->lkb_ownpid = ms->m_pid;
3121         lkb->lkb_remid = ms->m_lkid;
3122         lkb->lkb_grmode = DLM_LOCK_IV;
3123         lkb->lkb_rqmode = ms->m_rqmode;
3124
3125         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3126         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3127
3128         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3129                 /* lkb was just created so there won't be an lvb yet */
3130                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3131                 if (!lkb->lkb_lvbptr)
3132                         return -ENOMEM;
3133         }
3134
3135         return 0;
3136 }
3137
3138 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3139                                 struct dlm_message *ms)
3140 {
3141         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3142                 return -EBUSY;
3143
3144         if (receive_lvb(ls, lkb, ms))
3145                 return -ENOMEM;
3146
3147         lkb->lkb_rqmode = ms->m_rqmode;
3148         lkb->lkb_lvbseq = ms->m_lvbseq;
3149
3150         return 0;
3151 }
3152
3153 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3154                                struct dlm_message *ms)
3155 {
3156         if (receive_lvb(ls, lkb, ms))
3157                 return -ENOMEM;
3158         return 0;
3159 }
3160
3161 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3162    uses to send a reply and that the remote end uses to process the reply. */
3163
3164 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3165 {
3166         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3167         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3168         lkb->lkb_remid = ms->m_lkid;
3169 }
3170
3171 /* This is called after the rsb is locked so that we can safely inspect
3172    fields in the lkb. */
3173
3174 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3175 {
3176         int from = ms->m_header.h_nodeid;
3177         int error = 0;
3178
3179         switch (ms->m_type) {
3180         case DLM_MSG_CONVERT:
3181         case DLM_MSG_UNLOCK:
3182         case DLM_MSG_CANCEL:
3183                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3184                         error = -EINVAL;
3185                 break;
3186
3187         case DLM_MSG_CONVERT_REPLY:
3188         case DLM_MSG_UNLOCK_REPLY:
3189         case DLM_MSG_CANCEL_REPLY:
3190         case DLM_MSG_GRANT:
3191         case DLM_MSG_BAST:
3192                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3193                         error = -EINVAL;
3194                 break;
3195
3196         case DLM_MSG_REQUEST_REPLY:
3197                 if (!is_process_copy(lkb))
3198                         error = -EINVAL;
3199                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3200                         error = -EINVAL;
3201                 break;
3202
3203         default:
3204                 error = -EINVAL;
3205         }
3206
3207         if (error)
3208                 log_error(lkb->lkb_resource->res_ls,
3209                           "ignore invalid message %d from %d %x %x %x %d",
3210                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3211                           lkb->lkb_flags, lkb->lkb_nodeid);
3212         return error;
3213 }
3214
3215 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3216 {
3217         struct dlm_lkb *lkb;
3218         struct dlm_rsb *r;
3219         int error, namelen;
3220
3221         error = create_lkb(ls, &lkb);
3222         if (error)
3223                 goto fail;
3224
3225         receive_flags(lkb, ms);
3226         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3227         error = receive_request_args(ls, lkb, ms);
3228         if (error) {
3229                 __put_lkb(ls, lkb);
3230                 goto fail;
3231         }
3232
3233         namelen = receive_extralen(ms);
3234
3235         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3236         if (error) {
3237                 __put_lkb(ls, lkb);
3238                 goto fail;
3239         }
3240
3241         lock_rsb(r);
3242
3243         attach_lkb(r, lkb);
3244         error = do_request(r, lkb);
3245         send_request_reply(r, lkb, error);
3246         do_request_effects(r, lkb, error);
3247
3248         unlock_rsb(r);
3249         put_rsb(r);
3250
3251         if (error == -EINPROGRESS)
3252                 error = 0;
3253         if (error)
3254                 dlm_put_lkb(lkb);
3255         return;
3256
3257  fail:
3258         setup_stub_lkb(ls, ms);
3259         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3260 }
3261
3262 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3263 {
3264         struct dlm_lkb *lkb;
3265         struct dlm_rsb *r;
3266         int error, reply = 1;
3267
3268         error = find_lkb(ls, ms->m_remid, &lkb);
3269         if (error)
3270                 goto fail;
3271
3272         r = lkb->lkb_resource;
3273
3274         hold_rsb(r);
3275         lock_rsb(r);
3276
3277         error = validate_message(lkb, ms);
3278         if (error)
3279                 goto out;
3280
3281         receive_flags(lkb, ms);
3282
3283         error = receive_convert_args(ls, lkb, ms);
3284         if (error) {
3285                 send_convert_reply(r, lkb, error);
3286                 goto out;
3287         }
3288
3289         reply = !down_conversion(lkb);
3290
3291         error = do_convert(r, lkb);
3292         if (reply)
3293                 send_convert_reply(r, lkb, error);
3294         do_convert_effects(r, lkb, error);
3295  out:
3296         unlock_rsb(r);
3297         put_rsb(r);
3298         dlm_put_lkb(lkb);
3299         return;
3300
3301  fail:
3302         setup_stub_lkb(ls, ms);
3303         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3304 }
3305
3306 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3307 {
3308         struct dlm_lkb *lkb;
3309         struct dlm_rsb *r;
3310         int error;
3311
3312         error = find_lkb(ls, ms->m_remid, &lkb);
3313         if (error)
3314                 goto fail;
3315
3316         r = lkb->lkb_resource;
3317
3318         hold_rsb(r);
3319         lock_rsb(r);
3320
3321         error = validate_message(lkb, ms);
3322         if (error)
3323                 goto out;
3324
3325         receive_flags(lkb, ms);
3326
3327         error = receive_unlock_args(ls, lkb, ms);
3328         if (error) {
3329                 send_unlock_reply(r, lkb, error);
3330                 goto out;
3331         }
3332
3333         error = do_unlock(r, lkb);
3334         send_unlock_reply(r, lkb, error);
3335         do_unlock_effects(r, lkb, error);
3336  out:
3337         unlock_rsb(r);
3338         put_rsb(r);
3339         dlm_put_lkb(lkb);
3340         return;
3341
3342  fail:
3343         setup_stub_lkb(ls, ms);
3344         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3345 }
3346
3347 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3348 {
3349         struct dlm_lkb *lkb;
3350         struct dlm_rsb *r;
3351         int error;
3352
3353         error = find_lkb(ls, ms->m_remid, &lkb);
3354         if (error)
3355                 goto fail;
3356
3357         receive_flags(lkb, ms);
3358
3359         r = lkb->lkb_resource;
3360
3361         hold_rsb(r);
3362         lock_rsb(r);
3363
3364         error = validate_message(lkb, ms);
3365         if (error)
3366                 goto out;
3367
3368         error = do_cancel(r, lkb);
3369         send_cancel_reply(r, lkb, error);
3370         do_cancel_effects(r, lkb, error);
3371  out:
3372         unlock_rsb(r);
3373         put_rsb(r);
3374         dlm_put_lkb(lkb);
3375         return;
3376
3377  fail:
3378         setup_stub_lkb(ls, ms);
3379         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3380 }
3381
3382 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3383 {
3384         struct dlm_lkb *lkb;
3385         struct dlm_rsb *r;
3386         int error;
3387
3388         error = find_lkb(ls, ms->m_remid, &lkb);
3389         if (error) {
3390                 log_debug(ls, "receive_grant from %d no lkb %x",
3391                           ms->m_header.h_nodeid, ms->m_remid);
3392                 return;
3393         }
3394
3395         r = lkb->lkb_resource;
3396
3397         hold_rsb(r);
3398         lock_rsb(r);
3399
3400         error = validate_message(lkb, ms);
3401         if (error)
3402                 goto out;
3403
3404         receive_flags_reply(lkb, ms);
3405         if (is_altmode(lkb))
3406                 munge_altmode(lkb, ms);
3407         grant_lock_pc(r, lkb, ms);
3408         queue_cast(r, lkb, 0);
3409  out:
3410         unlock_rsb(r);
3411         put_rsb(r);
3412         dlm_put_lkb(lkb);
3413 }
3414
3415 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3416 {
3417         struct dlm_lkb *lkb;
3418         struct dlm_rsb *r;
3419         int error;
3420
3421         error = find_lkb(ls, ms->m_remid, &lkb);
3422         if (error) {
3423                 log_debug(ls, "receive_bast from %d no lkb %x",
3424                           ms->m_header.h_nodeid, ms->m_remid);
3425                 return;
3426         }
3427
3428         r = lkb->lkb_resource;
3429
3430         hold_rsb(r);
3431         lock_rsb(r);
3432
3433         error = validate_message(lkb, ms);
3434         if (error)
3435                 goto out;
3436
3437         queue_bast(r, lkb, ms->m_bastmode);
3438  out:
3439         unlock_rsb(r);
3440         put_rsb(r);
3441         dlm_put_lkb(lkb);
3442 }
3443
3444 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3445 {
3446         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3447
3448         from_nodeid = ms->m_header.h_nodeid;
3449         our_nodeid = dlm_our_nodeid();
3450
3451         len = receive_extralen(ms);
3452
3453         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3454         if (dir_nodeid != our_nodeid) {
3455                 log_error(ls, "lookup dir_nodeid %d from %d",
3456                           dir_nodeid, from_nodeid);
3457                 error = -EINVAL;
3458                 ret_nodeid = -1;
3459                 goto out;
3460         }
3461
3462         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3463
3464         /* Optimization: we're master so treat lookup as a request */
3465         if (!error && ret_nodeid == our_nodeid) {
3466                 receive_request(ls, ms);
3467                 return;
3468         }
3469  out:
3470         send_lookup_reply(ls, ms, ret_nodeid, error);
3471 }
3472
3473 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3474 {
3475         int len, dir_nodeid, from_nodeid;
3476
3477         from_nodeid = ms->m_header.h_nodeid;
3478
3479         len = receive_extralen(ms);
3480
3481         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3482         if (dir_nodeid != dlm_our_nodeid()) {
3483                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3484                           dir_nodeid, from_nodeid);
3485                 return;
3486         }
3487
3488         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3489 }
3490
3491 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3492 {
3493         do_purge(ls, ms->m_nodeid, ms->m_pid);
3494 }
3495
3496 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3497 {
3498         struct dlm_lkb *lkb;
3499         struct dlm_rsb *r;
3500         int error, mstype, result;
3501
3502         error = find_lkb(ls, ms->m_remid, &lkb);
3503         if (error) {
3504                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3505                           ms->m_header.h_nodeid, ms->m_remid);
3506                 return;
3507         }
3508
3509         r = lkb->lkb_resource;
3510         hold_rsb(r);
3511         lock_rsb(r);
3512
3513         error = validate_message(lkb, ms);
3514         if (error)
3515                 goto out;
3516
3517         mstype = lkb->lkb_wait_type;
3518         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3519         if (error)
3520                 goto out;
3521
3522         /* Optimization: the dir node was also the master, so it took our
3523            lookup as a request and sent request reply instead of lookup reply */
3524         if (mstype == DLM_MSG_LOOKUP) {
3525                 r->res_nodeid = ms->m_header.h_nodeid;
3526                 lkb->lkb_nodeid = r->res_nodeid;
3527         }
3528
3529         /* this is the value returned from do_request() on the master */
3530         result = ms->m_result;
3531
3532         switch (result) {
3533         case -EAGAIN:
3534                 /* request would block (be queued) on remote master */
3535                 queue_cast(r, lkb, -EAGAIN);
3536                 confirm_master(r, -EAGAIN);
3537                 unhold_lkb(lkb); /* undoes create_lkb() */
3538                 break;
3539
3540         case -EINPROGRESS:
3541         case 0:
3542                 /* request was queued or granted on remote master */
3543                 receive_flags_reply(lkb, ms);
3544                 lkb->lkb_remid = ms->m_lkid;
3545                 if (is_altmode(lkb))
3546                         munge_altmode(lkb, ms);
3547                 if (result) {
3548                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3549                         add_timeout(lkb);
3550                 } else {
3551                         grant_lock_pc(r, lkb, ms);
3552                         queue_cast(r, lkb, 0);
3553                 }
3554                 confirm_master(r, result);
3555                 break;
3556
3557         case -EBADR:
3558         case -ENOTBLK:
3559                 /* find_rsb failed to find rsb or rsb wasn't master */
3560                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3561                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3562                 r->res_nodeid = -1;
3563                 lkb->lkb_nodeid = -1;
3564
3565                 if (is_overlap(lkb)) {
3566                         /* we'll ignore error in cancel/unlock reply */
3567                         queue_cast_overlap(r, lkb);
3568                         confirm_master(r, result);
3569                         unhold_lkb(lkb); /* undoes create_lkb() */
3570                 } else
3571                         _request_lock(r, lkb);
3572                 break;
3573
3574         default:
3575                 log_error(ls, "receive_request_reply %x error %d",
3576                           lkb->lkb_id, result);
3577         }
3578
3579         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3580                 log_debug(ls, "receive_request_reply %x result %d unlock",
3581                           lkb->lkb_id, result);
3582                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3583                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3584                 send_unlock(r, lkb);
3585         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3586                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3587                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3588                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3589                 send_cancel(r, lkb);
3590         } else {
3591                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3592                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3593         }
3594  out:
3595         unlock_rsb(r);
3596         put_rsb(r);
3597         dlm_put_lkb(lkb);
3598 }
3599
3600 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3601                                     struct dlm_message *ms)
3602 {
3603         /* this is the value returned from do_convert() on the master */
3604         switch (ms->m_result) {
3605         case -EAGAIN:
3606                 /* convert would block (be queued) on remote master */
3607                 queue_cast(r, lkb, -EAGAIN);
3608                 break;
3609
3610         case -EDEADLK:
3611                 receive_flags_reply(lkb, ms);
3612                 revert_lock_pc(r, lkb);
3613                 queue_cast(r, lkb, -EDEADLK);
3614                 break;
3615
3616         case -EINPROGRESS:
3617                 /* convert was queued on remote master */
3618                 receive_flags_reply(lkb, ms);
3619                 if (is_demoted(lkb))
3620                         munge_demoted(lkb, ms);
3621                 del_lkb(r, lkb);
3622                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3623                 add_timeout(lkb);
3624                 break;
3625
3626         case 0:
3627                 /* convert was granted on remote master */
3628                 receive_flags_reply(lkb, ms);
3629                 if (is_demoted(lkb))
3630                         munge_demoted(lkb, ms);
3631                 grant_lock_pc(r, lkb, ms);
3632                 queue_cast(r, lkb, 0);
3633                 break;
3634
3635         default:
3636                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3637                           lkb->lkb_id, ms->m_result);
3638         }
3639 }
3640
3641 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3642 {
3643         struct dlm_rsb *r = lkb->lkb_resource;
3644         int error;
3645
3646         hold_rsb(r);
3647         lock_rsb(r);
3648
3649         error = validate_message(lkb, ms);
3650         if (error)
3651                 goto out;
3652
3653         /* stub reply can happen with waiters_mutex held */
3654         error = remove_from_waiters_ms(lkb, ms);
3655         if (error)
3656                 goto out;
3657
3658         __receive_convert_reply(r, lkb, ms);
3659  out:
3660         unlock_rsb(r);
3661         put_rsb(r);
3662 }
3663
3664 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3665 {
3666         struct dlm_lkb *lkb;
3667         int error;
3668
3669         error = find_lkb(ls, ms->m_remid, &lkb);
3670         if (error) {
3671                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3672                           ms->m_header.h_nodeid, ms->m_remid);
3673                 return;
3674         }
3675
3676         _receive_convert_reply(lkb, ms);
3677         dlm_put_lkb(lkb);
3678 }
3679
3680 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3681 {
3682         struct dlm_rsb *r = lkb->lkb_resource;
3683         int error;
3684
3685         hold_rsb(r);
3686         lock_rsb(r);
3687
3688         error = validate_message(lkb, ms);
3689         if (error)
3690                 goto out;
3691
3692         /* stub reply can happen with waiters_mutex held */
3693         error = remove_from_waiters_ms(lkb, ms);
3694         if (error)
3695                 goto out;
3696
3697         /* this is the value returned from do_unlock() on the master */
3698
3699         switch (ms->m_result) {
3700         case -DLM_EUNLOCK:
3701                 receive_flags_reply(lkb, ms);
3702                 remove_lock_pc(r, lkb);
3703                 queue_cast(r, lkb, -DLM_EUNLOCK);
3704                 break;
3705         case -ENOENT:
3706                 break;
3707         default:
3708                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3709                           lkb->lkb_id, ms->m_result);
3710         }
3711  out:
3712         unlock_rsb(r);
3713         put_rsb(r);
3714 }
3715
3716 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3717 {
3718         struct dlm_lkb *lkb;
3719         int error;
3720
3721         error = find_lkb(ls, ms->m_remid, &lkb);
3722         if (error) {
3723                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3724                           ms->m_header.h_nodeid, ms->m_remid);
3725                 return;
3726         }
3727
3728         _receive_unlock_reply(lkb, ms);
3729         dlm_put_lkb(lkb);
3730 }
3731
3732 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3733 {
3734         struct dlm_rsb *r = lkb->lkb_resource;
3735         int error;
3736
3737         hold_rsb(r);
3738         lock_rsb(r);
3739
3740         error = validate_message(lkb, ms);
3741         if (error)
3742                 goto out;
3743
3744         /* stub reply can happen with waiters_mutex held */
3745         error = remove_from_waiters_ms(lkb, ms);
3746         if (error)
3747                 goto out;
3748
3749         /* this is the value returned from do_cancel() on the master */
3750
3751         switch (ms->m_result) {
3752         case -DLM_ECANCEL:
3753                 receive_flags_reply(lkb, ms);
3754                 revert_lock_pc(r, lkb);
3755                 queue_cast(r, lkb, -DLM_ECANCEL);
3756                 break;
3757         case 0:
3758                 break;
3759         default:
3760                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3761                           lkb->lkb_id, ms->m_result);
3762         }
3763  out:
3764         unlock_rsb(r);
3765         put_rsb(r);
3766 }
3767
3768 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3769 {
3770         struct dlm_lkb *lkb;
3771         int error;
3772
3773         error = find_lkb(ls, ms->m_remid, &lkb);
3774         if (error) {
3775                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3776                           ms->m_header.h_nodeid, ms->m_remid);
3777                 return;
3778         }
3779
3780         _receive_cancel_reply(lkb, ms);
3781         dlm_put_lkb(lkb);
3782 }
3783
3784 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3785 {
3786         struct dlm_lkb *lkb;
3787         struct dlm_rsb *r;
3788         int error, ret_nodeid;
3789
3790         error = find_lkb(ls, ms->m_lkid, &lkb);
3791         if (error) {
3792                 log_error(ls, "receive_lookup_reply no lkb");
3793                 return;
3794         }
3795
3796         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3797            FIXME: will a non-zero error ever be returned? */
3798
3799         r = lkb->lkb_resource;
3800         hold_rsb(r);
3801         lock_rsb(r);
3802
3803         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3804         if (error)
3805                 goto out;
3806
3807         ret_nodeid = ms->m_nodeid;
3808         if (ret_nodeid == dlm_our_nodeid()) {
3809                 r->res_nodeid = 0;
3810                 ret_nodeid = 0;
3811                 r->res_first_lkid = 0;
3812         } else {
3813                 /* set_master() will copy res_nodeid to lkb_nodeid */
3814                 r->res_nodeid = ret_nodeid;
3815         }
3816
3817         if (is_overlap(lkb)) {
3818                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3819                           lkb->lkb_id, lkb->lkb_flags);
3820                 queue_cast_overlap(r, lkb);
3821                 unhold_lkb(lkb); /* undoes create_lkb() */
3822                 goto out_list;
3823         }
3824
3825         _request_lock(r, lkb);
3826
3827  out_list:
3828         if (!ret_nodeid)
3829                 process_lookup_list(r);
3830  out:
3831         unlock_rsb(r);
3832         put_rsb(r);
3833         dlm_put_lkb(lkb);
3834 }
3835
3836 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3837 {
3838         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3839                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3840                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3841                           ms->m_remid, ms->m_result);
3842                 return;
3843         }
3844
3845         switch (ms->m_type) {
3846
3847         /* messages sent to a master node */
3848
3849         case DLM_MSG_REQUEST:
3850                 receive_request(ls, ms);
3851                 break;
3852
3853         case DLM_MSG_CONVERT:
3854                 receive_convert(ls, ms);
3855                 break;
3856
3857         case DLM_MSG_UNLOCK:
3858                 receive_unlock(ls, ms);
3859                 break;
3860
3861         case DLM_MSG_CANCEL:
3862                 receive_cancel(ls, ms);
3863                 break;
3864
3865         /* messages sent from a master node (replies to above) */
3866
3867         case DLM_MSG_REQUEST_REPLY:
3868                 receive_request_reply(ls, ms);
3869                 break;
3870
3871         case DLM_MSG_CONVERT_REPLY:
3872                 receive_convert_reply(ls, ms);
3873                 break;
3874
3875         case DLM_MSG_UNLOCK_REPLY:
3876                 receive_unlock_reply(ls, ms);
3877                 break;
3878
3879         case DLM_MSG_CANCEL_REPLY:
3880                 receive_cancel_reply(ls, ms);
3881                 break;
3882
3883         /* messages sent from a master node (only two types of async msg) */
3884
3885         case DLM_MSG_GRANT:
3886                 receive_grant(ls, ms);
3887                 break;
3888
3889         case DLM_MSG_BAST:
3890                 receive_bast(ls, ms);
3891                 break;
3892
3893         /* messages sent to a dir node */
3894
3895         case DLM_MSG_LOOKUP:
3896                 receive_lookup(ls, ms);
3897                 break;
3898
3899         case DLM_MSG_REMOVE:
3900                 receive_remove(ls, ms);
3901                 break;
3902
3903         /* messages sent from a dir node (remove has no reply) */
3904
3905         case DLM_MSG_LOOKUP_REPLY:
3906                 receive_lookup_reply(ls, ms);
3907                 break;
3908
3909         /* other messages */
3910
3911         case DLM_MSG_PURGE:
3912                 receive_purge(ls, ms);
3913                 break;
3914
3915         default:
3916                 log_error(ls, "unknown message type %d", ms->m_type);
3917         }
3918
3919         dlm_astd_wake();
3920 }
3921
3922 /* If the lockspace is in recovery mode (locking stopped), then normal
3923    messages are saved on the requestqueue for processing after recovery is
3924    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3925    messages off the requestqueue before we process new ones. This occurs right
3926    after recovery completes when we transition from saving all messages on
3927    requestqueue, to processing all the saved messages, to processing new
3928    messages as they arrive. */
3929
3930 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3931                                 int nodeid)
3932 {
3933         if (dlm_locking_stopped(ls)) {
3934                 dlm_add_requestqueue(ls, nodeid, ms);
3935         } else {
3936                 dlm_wait_requestqueue(ls);
3937                 _receive_message(ls, ms);
3938         }
3939 }
3940
3941 /* This is called by dlm_recoverd to process messages that were saved on
3942    the requestqueue. */
3943
3944 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3945 {
3946         _receive_message(ls, ms);
3947 }
3948
3949 /* This is called by the midcomms layer when something is received for
3950    the lockspace.  It could be either a MSG (normal message sent as part of
3951    standard locking activity) or an RCOM (recovery message sent as part of
3952    lockspace recovery). */
3953
3954 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3955 {
3956         struct dlm_header *hd = &p->header;
3957         struct dlm_ls *ls;
3958         int type = 0;
3959
3960         switch (hd->h_cmd) {
3961         case DLM_MSG:
3962                 dlm_message_in(&p->message);
3963                 type = p->message.m_type;
3964                 break;
3965         case DLM_RCOM:
3966                 dlm_rcom_in(&p->rcom);
3967                 type = p->rcom.rc_type;
3968                 break;
3969         default:
3970                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3971                 return;
3972         }
3973
3974         if (hd->h_nodeid != nodeid) {
3975                 log_print("invalid h_nodeid %d from %d lockspace %x",
3976                           hd->h_nodeid, nodeid, hd->h_lockspace);
3977                 return;
3978         }
3979
3980         ls = dlm_find_lockspace_global(hd->h_lockspace);
3981         if (!ls) {
3982                 if (dlm_config.ci_log_debug)
3983                         log_print("invalid lockspace %x from %d cmd %d type %d",
3984                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3985
3986                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3987                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3988                 return;
3989         }
3990
3991         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3992            be inactive (in this ls) before transitioning to recovery mode */
3993
3994         down_read(&ls->ls_recv_active);
3995         if (hd->h_cmd == DLM_MSG)
3996                 dlm_receive_message(ls, &p->message, nodeid);
3997         else
3998                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3999         up_read(&ls->ls_recv_active);
4000
4001         dlm_put_lockspace(ls);
4002 }
4003
4004 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4005 {
4006         if (middle_conversion(lkb)) {
4007                 hold_lkb(lkb);
4008                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4009                 ls->ls_stub_ms.m_result = -EINPROGRESS;
4010                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4011                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4012                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
4013
4014                 /* Same special case as in receive_rcom_lock_args() */
4015                 lkb->lkb_grmode = DLM_LOCK_IV;
4016                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4017                 unhold_lkb(lkb);
4018
4019         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4020                 lkb->lkb_flags |= DLM_IFL_RESEND;
4021         }
4022
4023         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4024            conversions are async; there's no reply from the remote master */
4025 }
4026
4027 /* A waiting lkb needs recovery if the master node has failed, or
4028    the master node is changing (only when no directory is used) */
4029
4030 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4031 {
4032         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4033                 return 1;
4034
4035         if (!dlm_no_directory(ls))
4036                 return 0;
4037
4038         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4039                 return 1;
4040
4041         return 0;
4042 }
4043
4044 /* Recovery for locks that are waiting for replies from nodes that are now
4045    gone.  We can just complete unlocks and cancels by faking a reply from the
4046    dead node.  Requests and up-conversions we flag to be resent after
4047    recovery.  Down-conversions can just be completed with a fake reply like
4048    unlocks.  Conversions between PR and CW need special attention. */
4049
4050 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4051 {
4052         struct dlm_lkb *lkb, *safe;
4053         int wait_type, stub_unlock_result, stub_cancel_result;
4054
4055         mutex_lock(&ls->ls_waiters_mutex);
4056
4057         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4058                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4059                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4060
4061                 /* all outstanding lookups, regardless of destination  will be
4062                    resent after recovery is done */
4063
4064                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4065                         lkb->lkb_flags |= DLM_IFL_RESEND;
4066                         continue;
4067                 }
4068
4069                 if (!waiter_needs_recovery(ls, lkb))
4070                         continue;
4071
4072                 wait_type = lkb->lkb_wait_type;
4073                 stub_unlock_result = -DLM_EUNLOCK;
4074                 stub_cancel_result = -DLM_ECANCEL;
4075
4076                 /* Main reply may have been received leaving a zero wait_type,
4077                    but a reply for the overlapping op may not have been
4078                    received.  In that case we need to fake the appropriate
4079                    reply for the overlap op. */
4080
4081                 if (!wait_type) {
4082                         if (is_overlap_cancel(lkb)) {
4083                                 wait_type = DLM_MSG_CANCEL;
4084                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4085                                         stub_cancel_result = 0;
4086                         }
4087                         if (is_overlap_unlock(lkb)) {
4088                                 wait_type = DLM_MSG_UNLOCK;
4089                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4090                                         stub_unlock_result = -ENOENT;
4091                         }
4092
4093                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4094                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4095                                   stub_cancel_result, stub_unlock_result);
4096                 }
4097
4098                 switch (wait_type) {
4099
4100                 case DLM_MSG_REQUEST:
4101                         lkb->lkb_flags |= DLM_IFL_RESEND;
4102                         break;
4103
4104                 case DLM_MSG_CONVERT:
4105                         recover_convert_waiter(ls, lkb);
4106                         break;
4107
4108                 case DLM_MSG_UNLOCK:
4109                         hold_lkb(lkb);
4110                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4111                         ls->ls_stub_ms.m_result = stub_unlock_result;
4112                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4113                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4114                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4115                         dlm_put_lkb(lkb);
4116                         break;
4117
4118                 case DLM_MSG_CANCEL:
4119                         hold_lkb(lkb);
4120                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4121                         ls->ls_stub_ms.m_result = stub_cancel_result;
4122                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4123                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4124                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4125                         dlm_put_lkb(lkb);
4126                         break;
4127
4128                 default:
4129                         log_error(ls, "invalid lkb wait_type %d %d",
4130                                   lkb->lkb_wait_type, wait_type);
4131                 }
4132                 schedule();
4133         }
4134         mutex_unlock(&ls->ls_waiters_mutex);
4135 }
4136
4137 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4138 {
4139         struct dlm_lkb *lkb;
4140         int found = 0;
4141
4142         mutex_lock(&ls->ls_waiters_mutex);
4143         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4144                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4145                         hold_lkb(lkb);
4146                         found = 1;
4147                         break;
4148                 }
4149         }
4150         mutex_unlock(&ls->ls_waiters_mutex);
4151
4152         if (!found)
4153                 lkb = NULL;
4154         return lkb;
4155 }
4156
4157 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4158    master or dir-node for r.  Processing the lkb may result in it being placed
4159    back on waiters. */
4160
4161 /* We do this after normal locking has been enabled and any saved messages
4162    (in requestqueue) have been processed.  We should be confident that at
4163    this point we won't get or process a reply to any of these waiting
4164    operations.  But, new ops may be coming in on the rsbs/locks here from
4165    userspace or remotely. */
4166
4167 /* there may have been an overlap unlock/cancel prior to recovery or after
4168    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4169    overlap flag would just have been set and nothing new sent.  we can be
4170    confident here than any replies to either the initial op or overlap ops
4171    prior to recovery have been received. */
4172
4173 int dlm_recover_waiters_post(struct dlm_ls *ls)
4174 {
4175         struct dlm_lkb *lkb;
4176         struct dlm_rsb *r;
4177         int error = 0, mstype, err, oc, ou;
4178
4179         while (1) {
4180                 if (dlm_locking_stopped(ls)) {
4181                         log_debug(ls, "recover_waiters_post aborted");
4182                         error = -EINTR;
4183                         break;
4184                 }
4185
4186                 lkb = find_resend_waiter(ls);
4187                 if (!lkb)
4188                         break;
4189
4190                 r = lkb->lkb_resource;
4191                 hold_rsb(r);
4192                 lock_rsb(r);
4193
4194                 mstype = lkb->lkb_wait_type;
4195                 oc = is_overlap_cancel(lkb);
4196                 ou = is_overlap_unlock(lkb);
4197                 err = 0;
4198
4199                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4200                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4201
4202                 /* At this point we assume that we won't get a reply to any
4203                    previous op or overlap op on this lock.  First, do a big
4204                    remove_from_waiters() for all previous ops. */
4205
4206                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4207                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4208                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4209                 lkb->lkb_wait_type = 0;
4210                 lkb->lkb_wait_count = 0;
4211                 mutex_lock(&ls->ls_waiters_mutex);
4212                 list_del_init(&lkb->lkb_wait_reply);
4213                 mutex_unlock(&ls->ls_waiters_mutex);
4214                 unhold_lkb(lkb); /* for waiters list */
4215
4216                 if (oc || ou) {
4217                         /* do an unlock or cancel instead of resending */
4218                         switch (mstype) {
4219                         case DLM_MSG_LOOKUP:
4220                         case DLM_MSG_REQUEST:
4221                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4222                                                         -DLM_ECANCEL);
4223                                 unhold_lkb(lkb); /* undoes create_lkb() */
4224                                 break;
4225                         case DLM_MSG_CONVERT:
4226                                 if (oc) {
4227                                         queue_cast(r, lkb, -DLM_ECANCEL);
4228                                 } else {
4229                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4230                                         _unlock_lock(r, lkb);
4231                                 }
4232                                 break;
4233                         default:
4234                                 err = 1;
4235                         }
4236                 } else {
4237                         switch (mstype) {
4238                         case DLM_MSG_LOOKUP:
4239                         case DLM_MSG_REQUEST:
4240                                 _request_lock(r, lkb);
4241                                 if (is_master(r))
4242                                         confirm_master(r, 0);
4243                                 break;
4244                         case DLM_MSG_CONVERT:
4245                                 _convert_lock(r, lkb);
4246                                 break;
4247                         default:
4248                                 err = 1;
4249                         }
4250                 }
4251
4252                 if (err)
4253                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4254                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4255                 unlock_rsb(r);
4256                 put_rsb(r);
4257                 dlm_put_lkb(lkb);
4258         }
4259
4260         return error;
4261 }
4262
4263 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4264                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4265 {
4266         struct dlm_ls *ls = r->res_ls;
4267         struct dlm_lkb *lkb, *safe;
4268
4269         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4270                 if (test(ls, lkb)) {
4271                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4272                         del_lkb(r, lkb);
4273                         /* this put should free the lkb */
4274                         if (!dlm_put_lkb(lkb))
4275                                 log_error(ls, "purged lkb not released");
4276                 }
4277         }
4278 }
4279
4280 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4281 {
4282         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4283 }
4284
4285 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4286 {
4287         return is_master_copy(lkb);
4288 }
4289
4290 static void purge_dead_locks(struct dlm_rsb *r)
4291 {
4292         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4293         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4294         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4295 }
4296
4297 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4298 {
4299         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4300         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4301         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4302 }
4303
4304 /* Get rid of locks held by nodes that are gone. */
4305
4306 int dlm_purge_locks(struct dlm_ls *ls)
4307 {
4308         struct dlm_rsb *r;
4309
4310         log_debug(ls, "dlm_purge_locks");
4311
4312         down_write(&ls->ls_root_sem);
4313         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4314                 hold_rsb(r);
4315                 lock_rsb(r);
4316                 if (is_master(r))
4317                         purge_dead_locks(r);
4318                 unlock_rsb(r);
4319                 unhold_rsb(r);
4320
4321                 schedule();
4322         }
4323         up_write(&ls->ls_root_sem);
4324
4325         return 0;
4326 }
4327
4328 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4329 {
4330         struct dlm_rsb *r, *r_ret = NULL;
4331
4332         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4333         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4334                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4335                         continue;
4336                 hold_rsb(r);
4337                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4338                 r_ret = r;
4339                 break;
4340         }
4341         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4342         return r_ret;
4343 }
4344
4345 void dlm_grant_after_purge(struct dlm_ls *ls)
4346 {
4347         struct dlm_rsb *r;
4348         int bucket = 0;
4349
4350         while (1) {
4351                 r = find_purged_rsb(ls, bucket);
4352                 if (!r) {
4353                         if (bucket == ls->ls_rsbtbl_size - 1)
4354                                 break;
4355                         bucket++;
4356                         continue;
4357                 }
4358                 lock_rsb(r);
4359                 if (is_master(r)) {
4360                         grant_pending_locks(r);
4361                         confirm_master(r, 0);
4362                 }
4363                 unlock_rsb(r);
4364                 put_rsb(r);
4365                 schedule();
4366         }
4367 }
4368
4369 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4370                                          uint32_t remid)
4371 {
4372         struct dlm_lkb *lkb;
4373
4374         list_for_each_entry(lkb, head, lkb_statequeue) {
4375                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4376                         return lkb;
4377         }
4378         return NULL;
4379 }
4380
4381 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4382                                     uint32_t remid)
4383 {
4384         struct dlm_lkb *lkb;
4385
4386         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4387         if (lkb)
4388                 return lkb;
4389         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4390         if (lkb)
4391                 return lkb;
4392         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4393         if (lkb)
4394                 return lkb;
4395         return NULL;
4396 }
4397
4398 /* needs at least dlm_rcom + rcom_lock */
4399 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4400                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4401 {
4402         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4403
4404         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4405         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4406         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4407         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4408         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4409         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4410         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4411         lkb->lkb_rqmode = rl->rl_rqmode;
4412         lkb->lkb_grmode = rl->rl_grmode;
4413         /* don't set lkb_status because add_lkb wants to itself */
4414
4415         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4416         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4417
4418         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4419                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4420                          sizeof(struct rcom_lock);
4421                 if (lvblen > ls->ls_lvblen)
4422                         return -EINVAL;
4423                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4424                 if (!lkb->lkb_lvbptr)
4425                         return -ENOMEM;
4426                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4427         }
4428
4429         /* Conversions between PR and CW (middle modes) need special handling.
4430            The real granted mode of these converting locks cannot be determined
4431            until all locks have been rebuilt on the rsb (recover_conversion) */
4432
4433         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4434             middle_conversion(lkb)) {
4435                 rl->rl_status = DLM_LKSTS_CONVERT;
4436                 lkb->lkb_grmode = DLM_LOCK_IV;
4437                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4438         }
4439
4440         return 0;
4441 }
4442
4443 /* This lkb may have been recovered in a previous aborted recovery so we need
4444    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4445    If so we just send back a standard reply.  If not, we create a new lkb with
4446    the given values and send back our lkid.  We send back our lkid by sending
4447    back the rcom_lock struct we got but with the remid field filled in. */
4448
4449 /* needs at least dlm_rcom + rcom_lock */
4450 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4451 {
4452         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4453         struct dlm_rsb *r;
4454         struct dlm_lkb *lkb;
4455         int error;
4456
4457         if (rl->rl_parent_lkid) {
4458                 error = -EOPNOTSUPP;
4459                 goto out;
4460         }
4461
4462         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4463                          R_MASTER, &r);
4464         if (error)
4465                 goto out;
4466
4467         lock_rsb(r);
4468
4469         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4470         if (lkb) {
4471                 error = -EEXIST;
4472                 goto out_remid;
4473         }
4474
4475         error = create_lkb(ls, &lkb);
4476         if (error)
4477                 goto out_unlock;
4478
4479         error = receive_rcom_lock_args(ls, lkb, r, rc);
4480         if (error) {
4481                 __put_lkb(ls, lkb);
4482                 goto out_unlock;
4483         }
4484
4485         attach_lkb(r, lkb);
4486         add_lkb(r, lkb, rl->rl_status);
4487         error = 0;
4488
4489  out_remid:
4490         /* this is the new value returned to the lock holder for
4491            saving in its process-copy lkb */
4492         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4493
4494  out_unlock:
4495         unlock_rsb(r);
4496         put_rsb(r);
4497  out:
4498         if (error)
4499                 log_debug(ls, "recover_master_copy %d %x", error,
4500                           le32_to_cpu(rl->rl_lkid));
4501         rl->rl_result = cpu_to_le32(error);
4502         return error;
4503 }
4504
4505 /* needs at least dlm_rcom + rcom_lock */
4506 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4507 {
4508         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4509         struct dlm_rsb *r;
4510         struct dlm_lkb *lkb;
4511         int error;
4512
4513         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4514         if (error) {
4515                 log_error(ls, "recover_process_copy no lkid %x",
4516                                 le32_to_cpu(rl->rl_lkid));
4517                 return error;
4518         }
4519
4520         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4521
4522         error = le32_to_cpu(rl->rl_result);
4523
4524         r = lkb->lkb_resource;
4525         hold_rsb(r);
4526         lock_rsb(r);
4527
4528         switch (error) {
4529         case -EBADR:
4530                 /* There's a chance the new master received our lock before
4531                    dlm_recover_master_reply(), this wouldn't happen if we did
4532                    a barrier between recover_masters and recover_locks. */
4533                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4534                           (unsigned long)r, r->res_name);
4535                 dlm_send_rcom_lock(r, lkb);
4536                 goto out;
4537         case -EEXIST:
4538                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4539                 /* fall through */
4540         case 0:
4541                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4542                 break;
4543         default:
4544                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4545                           error, lkb->lkb_id);
4546         }
4547
4548         /* an ack for dlm_recover_locks() which waits for replies from
4549            all the locks it sends to new masters */
4550         dlm_recovered_lock(r);
4551  out:
4552         unlock_rsb(r);
4553         put_rsb(r);
4554         dlm_put_lkb(lkb);
4555
4556         return 0;
4557 }
4558
4559 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4560                      int mode, uint32_t flags, void *name, unsigned int namelen,
4561                      unsigned long timeout_cs)
4562 {
4563         struct dlm_lkb *lkb;
4564         struct dlm_args args;
4565         int error;
4566
4567         dlm_lock_recovery(ls);
4568
4569         error = create_lkb(ls, &lkb);
4570         if (error) {
4571                 kfree(ua);
4572                 goto out;
4573         }
4574
4575         if (flags & DLM_LKF_VALBLK) {
4576                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4577                 if (!ua->lksb.sb_lvbptr) {
4578                         kfree(ua);
4579                         __put_lkb(ls, lkb);
4580                         error = -ENOMEM;
4581                         goto out;
4582                 }
4583         }
4584
4585         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4586            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4587            lock and that lkb_astparam is the dlm_user_args structure. */
4588
4589         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4590                               fake_astfn, ua, fake_bastfn, &args);
4591         lkb->lkb_flags |= DLM_IFL_USER;
4592         ua->old_mode = DLM_LOCK_IV;
4593
4594         if (error) {
4595                 __put_lkb(ls, lkb);
4596                 goto out;
4597         }
4598
4599         error = request_lock(ls, lkb, name, namelen, &args);
4600
4601         switch (error) {
4602         case 0:
4603                 break;
4604         case -EINPROGRESS:
4605                 error = 0;
4606                 break;
4607         case -EAGAIN:
4608                 error = 0;
4609                 /* fall through */
4610         default:
4611                 __put_lkb(ls, lkb);
4612                 goto out;
4613         }
4614
4615         /* add this new lkb to the per-process list of locks */
4616         spin_lock(&ua->proc->locks_spin);
4617         hold_lkb(lkb);
4618         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4619         spin_unlock(&ua->proc->locks_spin);
4620  out:
4621         dlm_unlock_recovery(ls);
4622         return error;
4623 }
4624
4625 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4626                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4627                      unsigned long timeout_cs)
4628 {
4629         struct dlm_lkb *lkb;
4630         struct dlm_args args;
4631         struct dlm_user_args *ua;
4632         int error;
4633
4634         dlm_lock_recovery(ls);
4635
4636         error = find_lkb(ls, lkid, &lkb);
4637         if (error)
4638                 goto out;
4639
4640         /* user can change the params on its lock when it converts it, or
4641            add an lvb that didn't exist before */
4642
4643         ua = lkb->lkb_ua;
4644
4645         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4646                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4647                 if (!ua->lksb.sb_lvbptr) {
4648                         error = -ENOMEM;
4649                         goto out_put;
4650                 }
4651         }
4652         if (lvb_in && ua->lksb.sb_lvbptr)
4653                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4654
4655         ua->xid = ua_tmp->xid;
4656         ua->castparam = ua_tmp->castparam;
4657         ua->castaddr = ua_tmp->castaddr;
4658         ua->bastparam = ua_tmp->bastparam;
4659         ua->bastaddr = ua_tmp->bastaddr;
4660         ua->user_lksb = ua_tmp->user_lksb;
4661         ua->old_mode = lkb->lkb_grmode;
4662
4663         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4664                               fake_astfn, ua, fake_bastfn, &args);
4665         if (error)
4666                 goto out_put;
4667
4668         error = convert_lock(ls, lkb, &args);
4669
4670         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4671                 error = 0;
4672  out_put:
4673         dlm_put_lkb(lkb);
4674  out:
4675         dlm_unlock_recovery(ls);
4676         kfree(ua_tmp);
4677         return error;
4678 }
4679
4680 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4681                     uint32_t flags, uint32_t lkid, char *lvb_in)
4682 {
4683         struct dlm_lkb *lkb;
4684         struct dlm_args args;
4685         struct dlm_user_args *ua;
4686         int error;
4687
4688         dlm_lock_recovery(ls);
4689
4690         error = find_lkb(ls, lkid, &lkb);
4691         if (error)
4692                 goto out;
4693
4694         ua = lkb->lkb_ua;
4695
4696         if (lvb_in && ua->lksb.sb_lvbptr)
4697                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4698         if (ua_tmp->castparam)
4699                 ua->castparam = ua_tmp->castparam;
4700         ua->user_lksb = ua_tmp->user_lksb;
4701
4702         error = set_unlock_args(flags, ua, &args);
4703         if (error)
4704                 goto out_put;
4705
4706         error = unlock_lock(ls, lkb, &args);
4707
4708         if (error == -DLM_EUNLOCK)
4709                 error = 0;
4710         /* from validate_unlock_args() */
4711         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4712                 error = 0;
4713         if (error)
4714                 goto out_put;
4715
4716         spin_lock(&ua->proc->locks_spin);
4717         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4718         if (!list_empty(&lkb->lkb_ownqueue))
4719                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4720         spin_unlock(&ua->proc->locks_spin);
4721  out_put:
4722         dlm_put_lkb(lkb);
4723  out:
4724         dlm_unlock_recovery(ls);
4725         kfree(ua_tmp);
4726         return error;
4727 }
4728
4729 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4730                     uint32_t flags, uint32_t lkid)
4731 {
4732         struct dlm_lkb *lkb;
4733         struct dlm_args args;
4734         struct dlm_user_args *ua;
4735         int error;
4736
4737         dlm_lock_recovery(ls);
4738
4739         error = find_lkb(ls, lkid, &lkb);
4740         if (error)
4741                 goto out;
4742
4743         ua = lkb->lkb_ua;
4744         if (ua_tmp->castparam)
4745                 ua->castparam = ua_tmp->castparam;
4746         ua->user_lksb = ua_tmp->user_lksb;
4747
4748         error = set_unlock_args(flags, ua, &args);
4749         if (error)
4750                 goto out_put;
4751
4752         error = cancel_lock(ls, lkb, &args);
4753
4754         if (error == -DLM_ECANCEL)
4755                 error = 0;
4756         /* from validate_unlock_args() */
4757         if (error == -EBUSY)
4758                 error = 0;
4759  out_put:
4760         dlm_put_lkb(lkb);
4761  out:
4762         dlm_unlock_recovery(ls);
4763         kfree(ua_tmp);
4764         return error;
4765 }
4766
4767 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4768 {
4769         struct dlm_lkb *lkb;
4770         struct dlm_args args;
4771         struct dlm_user_args *ua;
4772         struct dlm_rsb *r;
4773         int error;
4774
4775         dlm_lock_recovery(ls);
4776
4777         error = find_lkb(ls, lkid, &lkb);
4778         if (error)
4779                 goto out;
4780
4781         ua = lkb->lkb_ua;
4782
4783         error = set_unlock_args(flags, ua, &args);
4784         if (error)
4785                 goto out_put;
4786
4787         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4788
4789         r = lkb->lkb_resource;
4790         hold_rsb(r);
4791         lock_rsb(r);
4792
4793         error = validate_unlock_args(lkb, &args);
4794         if (error)
4795                 goto out_r;
4796         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4797
4798         error = _cancel_lock(r, lkb);
4799  out_r:
4800         unlock_rsb(r);
4801         put_rsb(r);
4802
4803         if (error == -DLM_ECANCEL)
4804                 error = 0;
4805         /* from validate_unlock_args() */
4806         if (error == -EBUSY)
4807                 error = 0;
4808  out_put:
4809         dlm_put_lkb(lkb);
4810  out:
4811         dlm_unlock_recovery(ls);
4812         return error;
4813 }
4814
4815 /* lkb's that are removed from the waiters list by revert are just left on the
4816    orphans list with the granted orphan locks, to be freed by purge */
4817
4818 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4819 {
4820         struct dlm_args args;
4821         int error;
4822
4823         hold_lkb(lkb);
4824         mutex_lock(&ls->ls_orphans_mutex);
4825         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4826         mutex_unlock(&ls->ls_orphans_mutex);
4827
4828         set_unlock_args(0, lkb->lkb_ua, &args);
4829
4830         error = cancel_lock(ls, lkb, &args);
4831         if (error == -DLM_ECANCEL)
4832                 error = 0;
4833         return error;
4834 }
4835
4836 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4837    Regardless of what rsb queue the lock is on, it's removed and freed. */
4838
4839 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4840 {
4841         struct dlm_args args;
4842         int error;
4843
4844         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4845
4846         error = unlock_lock(ls, lkb, &args);
4847         if (error == -DLM_EUNLOCK)
4848                 error = 0;
4849         return error;
4850 }
4851
4852 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4853    (which does lock_rsb) due to deadlock with receiving a message that does
4854    lock_rsb followed by dlm_user_add_ast() */
4855
4856 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4857                                      struct dlm_user_proc *proc)
4858 {
4859         struct dlm_lkb *lkb = NULL;
4860
4861         mutex_lock(&ls->ls_clear_proc_locks);
4862         if (list_empty(&proc->locks))
4863                 goto out;
4864
4865         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4866         list_del_init(&lkb->lkb_ownqueue);
4867
4868         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4869                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4870         else
4871                 lkb->lkb_flags |= DLM_IFL_DEAD;
4872  out:
4873         mutex_unlock(&ls->ls_clear_proc_locks);
4874         return lkb;
4875 }
4876
4877 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4878    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4879    which we clear here. */
4880
4881 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4882    list, and no more device_writes should add lkb's to proc->locks list; so we
4883    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4884    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4885    them ourself. */
4886
4887 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4888 {
4889         struct dlm_lkb *lkb, *safe;
4890
4891         dlm_lock_recovery(ls);
4892
4893         while (1) {
4894                 lkb = del_proc_lock(ls, proc);
4895                 if (!lkb)
4896                         break;
4897                 del_timeout(lkb);
4898                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4899                         orphan_proc_lock(ls, lkb);
4900                 else
4901                         unlock_proc_lock(ls, lkb);
4902
4903                 /* this removes the reference for the proc->locks list
4904                    added by dlm_user_request, it may result in the lkb
4905                    being freed */
4906
4907                 dlm_put_lkb(lkb);
4908         }
4909
4910         mutex_lock(&ls->ls_clear_proc_locks);
4911
4912         /* in-progress unlocks */
4913         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4914                 list_del_init(&lkb->lkb_ownqueue);
4915                 lkb->lkb_flags |= DLM_IFL_DEAD;
4916                 dlm_put_lkb(lkb);
4917         }
4918
4919         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4920                 lkb->lkb_ast_type = 0;
4921                 list_del(&lkb->lkb_astqueue);
4922                 dlm_put_lkb(lkb);
4923         }
4924
4925         mutex_unlock(&ls->ls_clear_proc_locks);
4926         dlm_unlock_recovery(ls);
4927 }
4928
4929 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4930 {
4931         struct dlm_lkb *lkb, *safe;
4932
4933         while (1) {
4934                 lkb = NULL;
4935                 spin_lock(&proc->locks_spin);
4936                 if (!list_empty(&proc->locks)) {
4937                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4938                                          lkb_ownqueue);
4939                         list_del_init(&lkb->lkb_ownqueue);
4940                 }
4941                 spin_unlock(&proc->locks_spin);
4942
4943                 if (!lkb)
4944                         break;
4945
4946                 lkb->lkb_flags |= DLM_IFL_DEAD;
4947                 unlock_proc_lock(ls, lkb);
4948                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4949         }
4950
4951         spin_lock(&proc->locks_spin);
4952         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4953                 list_del_init(&lkb->lkb_ownqueue);
4954                 lkb->lkb_flags |= DLM_IFL_DEAD;
4955                 dlm_put_lkb(lkb);
4956         }
4957         spin_unlock(&proc->locks_spin);
4958
4959         spin_lock(&proc->asts_spin);
4960         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4961                 list_del(&lkb->lkb_astqueue);
4962                 dlm_put_lkb(lkb);
4963         }
4964         spin_unlock(&proc->asts_spin);
4965 }
4966
4967 /* pid of 0 means purge all orphans */
4968
4969 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4970 {
4971         struct dlm_lkb *lkb, *safe;
4972
4973         mutex_lock(&ls->ls_orphans_mutex);
4974         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4975                 if (pid && lkb->lkb_ownpid != pid)
4976                         continue;
4977                 unlock_proc_lock(ls, lkb);
4978                 list_del_init(&lkb->lkb_ownqueue);
4979                 dlm_put_lkb(lkb);
4980         }
4981         mutex_unlock(&ls->ls_orphans_mutex);
4982 }
4983
4984 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4985 {
4986         struct dlm_message *ms;
4987         struct dlm_mhandle *mh;
4988         int error;
4989
4990         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4991                                 DLM_MSG_PURGE, &ms, &mh);
4992         if (error)
4993                 return error;
4994         ms->m_nodeid = nodeid;
4995         ms->m_pid = pid;
4996
4997         return send_message(mh, ms);
4998 }
4999
5000 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5001                    int nodeid, int pid)
5002 {
5003         int error = 0;
5004
5005         if (nodeid != dlm_our_nodeid()) {
5006                 error = send_purge(ls, nodeid, pid);
5007         } else {
5008                 dlm_lock_recovery(ls);
5009                 if (pid == current->pid)
5010                         purge_proc_locks(ls, proc);
5011                 else
5012                         do_purge(ls, nodeid, pid);
5013                 dlm_unlock_recovery(ls);
5014         }
5015         return error;
5016 }
5017