]> bbs.cooldavid.org Git - net-next-2.6.git/blob - drivers/block/drbd/drbd_main.c
80273f21a4aa0ec1158afca1d830efaee53f549b
[net-next-2.6.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/version.h>
31 #include <linux/drbd.h>
32 #include <asm/uaccess.h>
33 #include <asm/types.h>
34 #include <net/sock.h>
35 #include <linux/ctype.h>
36 #include <linux/smp_lock.h>
37 #include <linux/fs.h>
38 #include <linux/file.h>
39 #include <linux/proc_fs.h>
40 #include <linux/init.h>
41 #include <linux/mm.h>
42 #include <linux/memcontrol.h>
43 #include <linux/mm_inline.h>
44 #include <linux/slab.h>
45 #include <linux/random.h>
46 #include <linux/reboot.h>
47 #include <linux/notifier.h>
48 #include <linux/kthread.h>
49
50 #define __KERNEL_SYSCALLS__
51 #include <linux/unistd.h>
52 #include <linux/vmalloc.h>
53
54 #include <linux/drbd_limits.h>
55 #include "drbd_int.h"
56 #include "drbd_tracing.h"
57 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
58
59 #include "drbd_vli.h"
60
61 struct after_state_chg_work {
62         struct drbd_work w;
63         union drbd_state os;
64         union drbd_state ns;
65         enum chg_state_flags flags;
66         struct completion *done;
67 };
68
69 int drbdd_init(struct drbd_thread *);
70 int drbd_worker(struct drbd_thread *);
71 int drbd_asender(struct drbd_thread *);
72
73 int drbd_init(void);
74 static int drbd_open(struct block_device *bdev, fmode_t mode);
75 static int drbd_release(struct gendisk *gd, fmode_t mode);
76 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
77 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
78                            union drbd_state ns, enum chg_state_flags flags);
79 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 static void md_sync_timer_fn(unsigned long data);
81 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82
83 DEFINE_TRACE(drbd_unplug);
84 DEFINE_TRACE(drbd_uuid);
85 DEFINE_TRACE(drbd_ee);
86 DEFINE_TRACE(drbd_packet);
87 DEFINE_TRACE(drbd_md_io);
88 DEFINE_TRACE(drbd_epoch);
89 DEFINE_TRACE(drbd_netlink);
90 DEFINE_TRACE(drbd_actlog);
91 DEFINE_TRACE(drbd_bio);
92 DEFINE_TRACE(_drbd_resync);
93 DEFINE_TRACE(drbd_req);
94
95 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
96               "Lars Ellenberg <lars@linbit.com>");
97 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
98 MODULE_VERSION(REL_VERSION);
99 MODULE_LICENSE("GPL");
100 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
101 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
102
103 #include <linux/moduleparam.h>
104 /* allow_open_on_secondary */
105 MODULE_PARM_DESC(allow_oos, "DONT USE!");
106 /* thanks to these macros, if compiled into the kernel (not-module),
107  * this becomes the boot parameter drbd.minor_count */
108 module_param(minor_count, uint, 0444);
109 module_param(disable_sendpage, bool, 0644);
110 module_param(allow_oos, bool, 0);
111 module_param(cn_idx, uint, 0444);
112 module_param(proc_details, int, 0644);
113
114 #ifdef CONFIG_DRBD_FAULT_INJECTION
115 int enable_faults;
116 int fault_rate;
117 static int fault_count;
118 int fault_devs;
119 /* bitmap of enabled faults */
120 module_param(enable_faults, int, 0664);
121 /* fault rate % value - applies to all enabled faults */
122 module_param(fault_rate, int, 0664);
123 /* count of faults inserted */
124 module_param(fault_count, int, 0664);
125 /* bitmap of devices to insert faults on */
126 module_param(fault_devs, int, 0644);
127 #endif
128
129 /* module parameter, defined */
130 unsigned int minor_count = 32;
131 int disable_sendpage;
132 int allow_oos;
133 unsigned int cn_idx = CN_IDX_DRBD;
134 int proc_details;       /* Detail level in proc drbd*/
135
136 /* Module parameter for setting the user mode helper program
137  * to run. Default is /sbin/drbdadm */
138 char usermode_helper[80] = "/sbin/drbdadm";
139
140 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
141
142 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
143  * as member "struct gendisk *vdisk;"
144  */
145 struct drbd_conf **minor_table;
146
147 struct kmem_cache *drbd_request_cache;
148 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
149 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
150 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
151 mempool_t *drbd_request_mempool;
152 mempool_t *drbd_ee_mempool;
153
154 /* I do not use a standard mempool, because:
155    1) I want to hand out the pre-allocated objects first.
156    2) I want to be able to interrupt sleeping allocation with a signal.
157    Note: This is a single linked list, the next pointer is the private
158          member of struct page.
159  */
160 struct page *drbd_pp_pool;
161 spinlock_t   drbd_pp_lock;
162 int          drbd_pp_vacant;
163 wait_queue_head_t drbd_pp_wait;
164
165 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
166
167 static struct block_device_operations drbd_ops = {
168         .owner =   THIS_MODULE,
169         .open =    drbd_open,
170         .release = drbd_release,
171 };
172
173 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
174
175 #ifdef __CHECKER__
176 /* When checking with sparse, and this is an inline function, sparse will
177    give tons of false positives. When this is a real functions sparse works.
178  */
179 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
180 {
181         int io_allowed;
182
183         atomic_inc(&mdev->local_cnt);
184         io_allowed = (mdev->state.disk >= mins);
185         if (!io_allowed) {
186                 if (atomic_dec_and_test(&mdev->local_cnt))
187                         wake_up(&mdev->misc_wait);
188         }
189         return io_allowed;
190 }
191
192 #endif
193
194 /**
195  * DOC: The transfer log
196  *
197  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
198  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
199  * of the list. There is always at least one &struct drbd_tl_epoch object.
200  *
201  * Each &struct drbd_tl_epoch has a circular double linked list of requests
202  * attached.
203  */
204 static int tl_init(struct drbd_conf *mdev)
205 {
206         struct drbd_tl_epoch *b;
207
208         /* during device minor initialization, we may well use GFP_KERNEL */
209         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
210         if (!b)
211                 return 0;
212         INIT_LIST_HEAD(&b->requests);
213         INIT_LIST_HEAD(&b->w.list);
214         b->next = NULL;
215         b->br_number = 4711;
216         b->n_req = 0;
217         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
218
219         mdev->oldest_tle = b;
220         mdev->newest_tle = b;
221         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
222
223         mdev->tl_hash = NULL;
224         mdev->tl_hash_s = 0;
225
226         return 1;
227 }
228
229 static void tl_cleanup(struct drbd_conf *mdev)
230 {
231         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
232         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
233         kfree(mdev->oldest_tle);
234         mdev->oldest_tle = NULL;
235         kfree(mdev->unused_spare_tle);
236         mdev->unused_spare_tle = NULL;
237         kfree(mdev->tl_hash);
238         mdev->tl_hash = NULL;
239         mdev->tl_hash_s = 0;
240 }
241
242 /**
243  * _tl_add_barrier() - Adds a barrier to the transfer log
244  * @mdev:       DRBD device.
245  * @new:        Barrier to be added before the current head of the TL.
246  *
247  * The caller must hold the req_lock.
248  */
249 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
250 {
251         struct drbd_tl_epoch *newest_before;
252
253         INIT_LIST_HEAD(&new->requests);
254         INIT_LIST_HEAD(&new->w.list);
255         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
256         new->next = NULL;
257         new->n_req = 0;
258
259         newest_before = mdev->newest_tle;
260         /* never send a barrier number == 0, because that is special-cased
261          * when using TCQ for our write ordering code */
262         new->br_number = (newest_before->br_number+1) ?: 1;
263         if (mdev->newest_tle != new) {
264                 mdev->newest_tle->next = new;
265                 mdev->newest_tle = new;
266         }
267 }
268
269 /**
270  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
271  * @mdev:       DRBD device.
272  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
273  * @set_size:   Expected number of requests before that barrier.
274  *
275  * In case the passed barrier_nr or set_size does not match the oldest
276  * &struct drbd_tl_epoch objects this function will cause a termination
277  * of the connection.
278  */
279 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
280                        unsigned int set_size)
281 {
282         struct drbd_tl_epoch *b, *nob; /* next old barrier */
283         struct list_head *le, *tle;
284         struct drbd_request *r;
285
286         spin_lock_irq(&mdev->req_lock);
287
288         b = mdev->oldest_tle;
289
290         /* first some paranoia code */
291         if (b == NULL) {
292                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
293                         barrier_nr);
294                 goto bail;
295         }
296         if (b->br_number != barrier_nr) {
297                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
298                         barrier_nr, b->br_number);
299                 goto bail;
300         }
301         if (b->n_req != set_size) {
302                 dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
303                         barrier_nr, set_size, b->n_req);
304                 goto bail;
305         }
306
307         /* Clean up list of requests processed during current epoch */
308         list_for_each_safe(le, tle, &b->requests) {
309                 r = list_entry(le, struct drbd_request, tl_requests);
310                 _req_mod(r, barrier_acked);
311         }
312         /* There could be requests on the list waiting for completion
313            of the write to the local disk. To avoid corruptions of
314            slab's data structures we have to remove the lists head.
315
316            Also there could have been a barrier ack out of sequence, overtaking
317            the write acks - which would be a bug and violating write ordering.
318            To not deadlock in case we lose connection while such requests are
319            still pending, we need some way to find them for the
320            _req_mode(connection_lost_while_pending).
321
322            These have been list_move'd to the out_of_sequence_requests list in
323            _req_mod(, barrier_acked) above.
324            */
325         list_del_init(&b->requests);
326
327         nob = b->next;
328         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
329                 _tl_add_barrier(mdev, b);
330                 if (nob)
331                         mdev->oldest_tle = nob;
332                 /* if nob == NULL b was the only barrier, and becomes the new
333                    barrier. Therefore mdev->oldest_tle points already to b */
334         } else {
335                 D_ASSERT(nob != NULL);
336                 mdev->oldest_tle = nob;
337                 kfree(b);
338         }
339
340         spin_unlock_irq(&mdev->req_lock);
341         dec_ap_pending(mdev);
342
343         return;
344
345 bail:
346         spin_unlock_irq(&mdev->req_lock);
347         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
348 }
349
350
351 /**
352  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
353  * @mdev:       DRBD device.
354  *
355  * This is called after the connection to the peer was lost. The storage covered
356  * by the requests on the transfer gets marked as our of sync. Called from the
357  * receiver thread and the worker thread.
358  */
359 void tl_clear(struct drbd_conf *mdev)
360 {
361         struct drbd_tl_epoch *b, *tmp;
362         struct list_head *le, *tle;
363         struct drbd_request *r;
364         int new_initial_bnr = net_random();
365
366         spin_lock_irq(&mdev->req_lock);
367
368         b = mdev->oldest_tle;
369         while (b) {
370                 list_for_each_safe(le, tle, &b->requests) {
371                         r = list_entry(le, struct drbd_request, tl_requests);
372                         /* It would be nice to complete outside of spinlock.
373                          * But this is easier for now. */
374                         _req_mod(r, connection_lost_while_pending);
375                 }
376                 tmp = b->next;
377
378                 /* there could still be requests on that ring list,
379                  * in case local io is still pending */
380                 list_del(&b->requests);
381
382                 /* dec_ap_pending corresponding to queue_barrier.
383                  * the newest barrier may not have been queued yet,
384                  * in which case w.cb is still NULL. */
385                 if (b->w.cb != NULL)
386                         dec_ap_pending(mdev);
387
388                 if (b == mdev->newest_tle) {
389                         /* recycle, but reinit! */
390                         D_ASSERT(tmp == NULL);
391                         INIT_LIST_HEAD(&b->requests);
392                         INIT_LIST_HEAD(&b->w.list);
393                         b->w.cb = NULL;
394                         b->br_number = new_initial_bnr;
395                         b->n_req = 0;
396
397                         mdev->oldest_tle = b;
398                         break;
399                 }
400                 kfree(b);
401                 b = tmp;
402         }
403
404         /* we expect this list to be empty. */
405         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
406
407         /* but just in case, clean it up anyways! */
408         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
409                 r = list_entry(le, struct drbd_request, tl_requests);
410                 /* It would be nice to complete outside of spinlock.
411                  * But this is easier for now. */
412                 _req_mod(r, connection_lost_while_pending);
413         }
414
415         /* ensure bit indicating barrier is required is clear */
416         clear_bit(CREATE_BARRIER, &mdev->flags);
417
418         spin_unlock_irq(&mdev->req_lock);
419 }
420
421 /**
422  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
423  * @mdev:       DRBD device.
424  * @os:         old (current) state.
425  * @ns:         new (wanted) state.
426  */
427 static int cl_wide_st_chg(struct drbd_conf *mdev,
428                           union drbd_state os, union drbd_state ns)
429 {
430         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
431                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
432                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
433                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
434                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
435                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
436                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
437 }
438
439 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
440                       union drbd_state mask, union drbd_state val)
441 {
442         unsigned long flags;
443         union drbd_state os, ns;
444         int rv;
445
446         spin_lock_irqsave(&mdev->req_lock, flags);
447         os = mdev->state;
448         ns.i = (os.i & ~mask.i) | val.i;
449         rv = _drbd_set_state(mdev, ns, f, NULL);
450         ns = mdev->state;
451         spin_unlock_irqrestore(&mdev->req_lock, flags);
452
453         return rv;
454 }
455
456 /**
457  * drbd_force_state() - Impose a change which happens outside our control on our state
458  * @mdev:       DRBD device.
459  * @mask:       mask of state bits to change.
460  * @val:        value of new state bits.
461  */
462 void drbd_force_state(struct drbd_conf *mdev,
463         union drbd_state mask, union drbd_state val)
464 {
465         drbd_change_state(mdev, CS_HARD, mask, val);
466 }
467
468 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
469 static int is_valid_state_transition(struct drbd_conf *,
470                                      union drbd_state, union drbd_state);
471 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
472                                        union drbd_state ns, int *warn_sync_abort);
473 int drbd_send_state_req(struct drbd_conf *,
474                         union drbd_state, union drbd_state);
475
476 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
477                                     union drbd_state mask, union drbd_state val)
478 {
479         union drbd_state os, ns;
480         unsigned long flags;
481         int rv;
482
483         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
484                 return SS_CW_SUCCESS;
485
486         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
487                 return SS_CW_FAILED_BY_PEER;
488
489         rv = 0;
490         spin_lock_irqsave(&mdev->req_lock, flags);
491         os = mdev->state;
492         ns.i = (os.i & ~mask.i) | val.i;
493         ns = sanitize_state(mdev, os, ns, NULL);
494
495         if (!cl_wide_st_chg(mdev, os, ns))
496                 rv = SS_CW_NO_NEED;
497         if (!rv) {
498                 rv = is_valid_state(mdev, ns);
499                 if (rv == SS_SUCCESS) {
500                         rv = is_valid_state_transition(mdev, ns, os);
501                         if (rv == SS_SUCCESS)
502                                 rv = 0; /* cont waiting, otherwise fail. */
503                 }
504         }
505         spin_unlock_irqrestore(&mdev->req_lock, flags);
506
507         return rv;
508 }
509
510 /**
511  * drbd_req_state() - Perform an eventually cluster wide state change
512  * @mdev:       DRBD device.
513  * @mask:       mask of state bits to change.
514  * @val:        value of new state bits.
515  * @f:          flags
516  *
517  * Should not be called directly, use drbd_request_state() or
518  * _drbd_request_state().
519  */
520 static int drbd_req_state(struct drbd_conf *mdev,
521                           union drbd_state mask, union drbd_state val,
522                           enum chg_state_flags f)
523 {
524         struct completion done;
525         unsigned long flags;
526         union drbd_state os, ns;
527         int rv;
528
529         init_completion(&done);
530
531         if (f & CS_SERIALIZE)
532                 mutex_lock(&mdev->state_mutex);
533
534         spin_lock_irqsave(&mdev->req_lock, flags);
535         os = mdev->state;
536         ns.i = (os.i & ~mask.i) | val.i;
537         ns = sanitize_state(mdev, os, ns, NULL);
538
539         if (cl_wide_st_chg(mdev, os, ns)) {
540                 rv = is_valid_state(mdev, ns);
541                 if (rv == SS_SUCCESS)
542                         rv = is_valid_state_transition(mdev, ns, os);
543                 spin_unlock_irqrestore(&mdev->req_lock, flags);
544
545                 if (rv < SS_SUCCESS) {
546                         if (f & CS_VERBOSE)
547                                 print_st_err(mdev, os, ns, rv);
548                         goto abort;
549                 }
550
551                 drbd_state_lock(mdev);
552                 if (!drbd_send_state_req(mdev, mask, val)) {
553                         drbd_state_unlock(mdev);
554                         rv = SS_CW_FAILED_BY_PEER;
555                         if (f & CS_VERBOSE)
556                                 print_st_err(mdev, os, ns, rv);
557                         goto abort;
558                 }
559
560                 wait_event(mdev->state_wait,
561                         (rv = _req_st_cond(mdev, mask, val)));
562
563                 if (rv < SS_SUCCESS) {
564                         drbd_state_unlock(mdev);
565                         if (f & CS_VERBOSE)
566                                 print_st_err(mdev, os, ns, rv);
567                         goto abort;
568                 }
569                 spin_lock_irqsave(&mdev->req_lock, flags);
570                 os = mdev->state;
571                 ns.i = (os.i & ~mask.i) | val.i;
572                 rv = _drbd_set_state(mdev, ns, f, &done);
573                 drbd_state_unlock(mdev);
574         } else {
575                 rv = _drbd_set_state(mdev, ns, f, &done);
576         }
577
578         spin_unlock_irqrestore(&mdev->req_lock, flags);
579
580         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
581                 D_ASSERT(current != mdev->worker.task);
582                 wait_for_completion(&done);
583         }
584
585 abort:
586         if (f & CS_SERIALIZE)
587                 mutex_unlock(&mdev->state_mutex);
588
589         return rv;
590 }
591
592 /**
593  * _drbd_request_state() - Request a state change (with flags)
594  * @mdev:       DRBD device.
595  * @mask:       mask of state bits to change.
596  * @val:        value of new state bits.
597  * @f:          flags
598  *
599  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
600  * flag, or when logging of failed state change requests is not desired.
601  */
602 int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
603                         union drbd_state val,   enum chg_state_flags f)
604 {
605         int rv;
606
607         wait_event(mdev->state_wait,
608                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
609
610         return rv;
611 }
612
613 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
614 {
615         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
616             name,
617             drbd_conn_str(ns.conn),
618             drbd_role_str(ns.role),
619             drbd_role_str(ns.peer),
620             drbd_disk_str(ns.disk),
621             drbd_disk_str(ns.pdsk),
622             ns.susp ? 's' : 'r',
623             ns.aftr_isp ? 'a' : '-',
624             ns.peer_isp ? 'p' : '-',
625             ns.user_isp ? 'u' : '-'
626             );
627 }
628
629 void print_st_err(struct drbd_conf *mdev,
630         union drbd_state os, union drbd_state ns, int err)
631 {
632         if (err == SS_IN_TRANSIENT_STATE)
633                 return;
634         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
635         print_st(mdev, " state", os);
636         print_st(mdev, "wanted", ns);
637 }
638
639
640 #define drbd_peer_str drbd_role_str
641 #define drbd_pdsk_str drbd_disk_str
642
643 #define drbd_susp_str(A)     ((A) ? "1" : "0")
644 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
645 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
646 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
647
648 #define PSC(A) \
649         ({ if (ns.A != os.A) { \
650                 pbp += sprintf(pbp, #A "( %s -> %s ) ", \
651                               drbd_##A##_str(os.A), \
652                               drbd_##A##_str(ns.A)); \
653         } })
654
655 /**
656  * is_valid_state() - Returns an SS_ error code if ns is not valid
657  * @mdev:       DRBD device.
658  * @ns:         State to consider.
659  */
660 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
661 {
662         /* See drbd_state_sw_errors in drbd_strings.c */
663
664         enum drbd_fencing_p fp;
665         int rv = SS_SUCCESS;
666
667         fp = FP_DONT_CARE;
668         if (get_ldev(mdev)) {
669                 fp = mdev->ldev->dc.fencing;
670                 put_ldev(mdev);
671         }
672
673         if (get_net_conf(mdev)) {
674                 if (!mdev->net_conf->two_primaries &&
675                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
676                         rv = SS_TWO_PRIMARIES;
677                 put_net_conf(mdev);
678         }
679
680         if (rv <= 0)
681                 /* already found a reason to abort */;
682         else if (ns.role == R_SECONDARY && mdev->open_cnt)
683                 rv = SS_DEVICE_IN_USE;
684
685         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
686                 rv = SS_NO_UP_TO_DATE_DISK;
687
688         else if (fp >= FP_RESOURCE &&
689                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
690                 rv = SS_PRIMARY_NOP;
691
692         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
693                 rv = SS_NO_UP_TO_DATE_DISK;
694
695         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
696                 rv = SS_NO_LOCAL_DISK;
697
698         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
699                 rv = SS_NO_REMOTE_DISK;
700
701         else if ((ns.conn == C_CONNECTED ||
702                   ns.conn == C_WF_BITMAP_S ||
703                   ns.conn == C_SYNC_SOURCE ||
704                   ns.conn == C_PAUSED_SYNC_S) &&
705                   ns.disk == D_OUTDATED)
706                 rv = SS_CONNECTED_OUTDATES;
707
708         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
709                  (mdev->sync_conf.verify_alg[0] == 0))
710                 rv = SS_NO_VERIFY_ALG;
711
712         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
713                   mdev->agreed_pro_version < 88)
714                 rv = SS_NOT_SUPPORTED;
715
716         return rv;
717 }
718
719 /**
720  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
721  * @mdev:       DRBD device.
722  * @ns:         new state.
723  * @os:         old state.
724  */
725 static int is_valid_state_transition(struct drbd_conf *mdev,
726                                      union drbd_state ns, union drbd_state os)
727 {
728         int rv = SS_SUCCESS;
729
730         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
731             os.conn > C_CONNECTED)
732                 rv = SS_RESYNC_RUNNING;
733
734         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
735                 rv = SS_ALREADY_STANDALONE;
736
737         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
738                 rv = SS_IS_DISKLESS;
739
740         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
741                 rv = SS_NO_NET_CONFIG;
742
743         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
744                 rv = SS_LOWER_THAN_OUTDATED;
745
746         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
747                 rv = SS_IN_TRANSIENT_STATE;
748
749         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
750                 rv = SS_IN_TRANSIENT_STATE;
751
752         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
753                 rv = SS_NEED_CONNECTION;
754
755         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
756             ns.conn != os.conn && os.conn > C_CONNECTED)
757                 rv = SS_RESYNC_RUNNING;
758
759         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
760             os.conn < C_CONNECTED)
761                 rv = SS_NEED_CONNECTION;
762
763         return rv;
764 }
765
766 /**
767  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
768  * @mdev:       DRBD device.
769  * @os:         old state.
770  * @ns:         new state.
771  * @warn_sync_abort:
772  *
773  * When we loose connection, we have to set the state of the peers disk (pdsk)
774  * to D_UNKNOWN. This rule and many more along those lines are in this function.
775  */
776 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
777                                        union drbd_state ns, int *warn_sync_abort)
778 {
779         enum drbd_fencing_p fp;
780
781         fp = FP_DONT_CARE;
782         if (get_ldev(mdev)) {
783                 fp = mdev->ldev->dc.fencing;
784                 put_ldev(mdev);
785         }
786
787         /* Disallow Network errors to configure a device's network part */
788         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
789             os.conn <= C_DISCONNECTING)
790                 ns.conn = os.conn;
791
792         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
793         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
794             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
795                 ns.conn = os.conn;
796
797         /* After C_DISCONNECTING only C_STANDALONE may follow */
798         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
799                 ns.conn = os.conn;
800
801         if (ns.conn < C_CONNECTED) {
802                 ns.peer_isp = 0;
803                 ns.peer = R_UNKNOWN;
804                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
805                         ns.pdsk = D_UNKNOWN;
806         }
807
808         /* Clear the aftr_isp when becoming unconfigured */
809         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
810                 ns.aftr_isp = 0;
811
812         if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
813                 ns.pdsk = D_UNKNOWN;
814
815         /* Abort resync if a disk fails/detaches */
816         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
817             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
818                 if (warn_sync_abort)
819                         *warn_sync_abort = 1;
820                 ns.conn = C_CONNECTED;
821         }
822
823         if (ns.conn >= C_CONNECTED &&
824             ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
825              (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
826                 switch (ns.conn) {
827                 case C_WF_BITMAP_T:
828                 case C_PAUSED_SYNC_T:
829                         ns.disk = D_OUTDATED;
830                         break;
831                 case C_CONNECTED:
832                 case C_WF_BITMAP_S:
833                 case C_SYNC_SOURCE:
834                 case C_PAUSED_SYNC_S:
835                         ns.disk = D_UP_TO_DATE;
836                         break;
837                 case C_SYNC_TARGET:
838                         ns.disk = D_INCONSISTENT;
839                         dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
840                         break;
841                 }
842                 if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
843                         dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
844         }
845
846         if (ns.conn >= C_CONNECTED &&
847             (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
848                 switch (ns.conn) {
849                 case C_CONNECTED:
850                 case C_WF_BITMAP_T:
851                 case C_PAUSED_SYNC_T:
852                 case C_SYNC_TARGET:
853                         ns.pdsk = D_UP_TO_DATE;
854                         break;
855                 case C_WF_BITMAP_S:
856                 case C_PAUSED_SYNC_S:
857                         ns.pdsk = D_OUTDATED;
858                         break;
859                 case C_SYNC_SOURCE:
860                         ns.pdsk = D_INCONSISTENT;
861                         dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
862                         break;
863                 }
864                 if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
865                         dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
866         }
867
868         /* Connection breaks down before we finished "Negotiating" */
869         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
870             get_ldev_if_state(mdev, D_NEGOTIATING)) {
871                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
872                         ns.disk = mdev->new_state_tmp.disk;
873                         ns.pdsk = mdev->new_state_tmp.pdsk;
874                 } else {
875                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
876                         ns.disk = D_DISKLESS;
877                         ns.pdsk = D_UNKNOWN;
878                 }
879                 put_ldev(mdev);
880         }
881
882         if (fp == FP_STONITH &&
883             (ns.role == R_PRIMARY &&
884              ns.conn < C_CONNECTED &&
885              ns.pdsk > D_OUTDATED))
886                         ns.susp = 1;
887
888         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
889                 if (ns.conn == C_SYNC_SOURCE)
890                         ns.conn = C_PAUSED_SYNC_S;
891                 if (ns.conn == C_SYNC_TARGET)
892                         ns.conn = C_PAUSED_SYNC_T;
893         } else {
894                 if (ns.conn == C_PAUSED_SYNC_S)
895                         ns.conn = C_SYNC_SOURCE;
896                 if (ns.conn == C_PAUSED_SYNC_T)
897                         ns.conn = C_SYNC_TARGET;
898         }
899
900         return ns;
901 }
902
903 /* helper for __drbd_set_state */
904 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
905 {
906         if (cs == C_VERIFY_T) {
907                 /* starting online verify from an arbitrary position
908                  * does not fit well into the existing protocol.
909                  * on C_VERIFY_T, we initialize ov_left and friends
910                  * implicitly in receive_DataRequest once the
911                  * first P_OV_REQUEST is received */
912                 mdev->ov_start_sector = ~(sector_t)0;
913         } else {
914                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
915                 if (bit >= mdev->rs_total)
916                         mdev->ov_start_sector =
917                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
918                 mdev->ov_position = mdev->ov_start_sector;
919         }
920 }
921
922 /**
923  * __drbd_set_state() - Set a new DRBD state
924  * @mdev:       DRBD device.
925  * @ns:         new state.
926  * @flags:      Flags
927  * @done:       Optional completion, that will get completed after the after_state_ch() finished
928  *
929  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
930  */
931 int __drbd_set_state(struct drbd_conf *mdev,
932                     union drbd_state ns, enum chg_state_flags flags,
933                     struct completion *done)
934 {
935         union drbd_state os;
936         int rv = SS_SUCCESS;
937         int warn_sync_abort = 0;
938         struct after_state_chg_work *ascw;
939
940         os = mdev->state;
941
942         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
943
944         if (ns.i == os.i)
945                 return SS_NOTHING_TO_DO;
946
947         if (!(flags & CS_HARD)) {
948                 /*  pre-state-change checks ; only look at ns  */
949                 /* See drbd_state_sw_errors in drbd_strings.c */
950
951                 rv = is_valid_state(mdev, ns);
952                 if (rv < SS_SUCCESS) {
953                         /* If the old state was illegal as well, then let
954                            this happen...*/
955
956                         if (is_valid_state(mdev, os) == rv) {
957                                 dev_err(DEV, "Considering state change from bad state. "
958                                     "Error would be: '%s'\n",
959                                     drbd_set_st_err_str(rv));
960                                 print_st(mdev, "old", os);
961                                 print_st(mdev, "new", ns);
962                                 rv = is_valid_state_transition(mdev, ns, os);
963                         }
964                 } else
965                         rv = is_valid_state_transition(mdev, ns, os);
966         }
967
968         if (rv < SS_SUCCESS) {
969                 if (flags & CS_VERBOSE)
970                         print_st_err(mdev, os, ns, rv);
971                 return rv;
972         }
973
974         if (warn_sync_abort)
975                 dev_warn(DEV, "Resync aborted.\n");
976
977         {
978                 char *pbp, pb[300];
979                 pbp = pb;
980                 *pbp = 0;
981                 PSC(role);
982                 PSC(peer);
983                 PSC(conn);
984                 PSC(disk);
985                 PSC(pdsk);
986                 PSC(susp);
987                 PSC(aftr_isp);
988                 PSC(peer_isp);
989                 PSC(user_isp);
990                 dev_info(DEV, "%s\n", pb);
991         }
992
993         /* solve the race between becoming unconfigured,
994          * worker doing the cleanup, and
995          * admin reconfiguring us:
996          * on (re)configure, first set CONFIG_PENDING,
997          * then wait for a potentially exiting worker,
998          * start the worker, and schedule one no_op.
999          * then proceed with configuration.
1000          */
1001         if (ns.disk == D_DISKLESS &&
1002             ns.conn == C_STANDALONE &&
1003             ns.role == R_SECONDARY &&
1004             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1005                 set_bit(DEVICE_DYING, &mdev->flags);
1006
1007         mdev->state.i = ns.i;
1008         wake_up(&mdev->misc_wait);
1009         wake_up(&mdev->state_wait);
1010
1011         /*   post-state-change actions   */
1012         if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1013                 set_bit(STOP_SYNC_TIMER, &mdev->flags);
1014                 mod_timer(&mdev->resync_timer, jiffies);
1015         }
1016
1017         /* aborted verify run. log the last position */
1018         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1019             ns.conn < C_CONNECTED) {
1020                 mdev->ov_start_sector =
1021                         BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1022                 dev_info(DEV, "Online Verify reached sector %llu\n",
1023                         (unsigned long long)mdev->ov_start_sector);
1024         }
1025
1026         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1027             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1028                 dev_info(DEV, "Syncer continues.\n");
1029                 mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1030                 if (ns.conn == C_SYNC_TARGET) {
1031                         if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1032                                 mod_timer(&mdev->resync_timer, jiffies);
1033                         /* This if (!test_bit) is only needed for the case
1034                            that a device that has ceased to used its timer,
1035                            i.e. it is already in drbd_resync_finished() gets
1036                            paused and resumed. */
1037                 }
1038         }
1039
1040         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1041             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1042                 dev_info(DEV, "Resync suspended\n");
1043                 mdev->rs_mark_time = jiffies;
1044                 if (ns.conn == C_PAUSED_SYNC_T)
1045                         set_bit(STOP_SYNC_TIMER, &mdev->flags);
1046         }
1047
1048         if (os.conn == C_CONNECTED &&
1049             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1050                 mdev->ov_position = 0;
1051                 mdev->rs_total =
1052                 mdev->rs_mark_left = drbd_bm_bits(mdev);
1053                 if (mdev->agreed_pro_version >= 90)
1054                         set_ov_position(mdev, ns.conn);
1055                 else
1056                         mdev->ov_start_sector = 0;
1057                 mdev->ov_left = mdev->rs_total
1058                               - BM_SECT_TO_BIT(mdev->ov_position);
1059                 mdev->rs_start     =
1060                 mdev->rs_mark_time = jiffies;
1061                 mdev->ov_last_oos_size = 0;
1062                 mdev->ov_last_oos_start = 0;
1063
1064                 if (ns.conn == C_VERIFY_S) {
1065                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1066                                         (unsigned long long)mdev->ov_position);
1067                         mod_timer(&mdev->resync_timer, jiffies);
1068                 }
1069         }
1070
1071         if (get_ldev(mdev)) {
1072                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1073                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1074                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1075
1076                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1077                         mdf |= MDF_CRASHED_PRIMARY;
1078                 if (mdev->state.role == R_PRIMARY ||
1079                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1080                         mdf |= MDF_PRIMARY_IND;
1081                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1082                         mdf |= MDF_CONNECTED_IND;
1083                 if (mdev->state.disk > D_INCONSISTENT)
1084                         mdf |= MDF_CONSISTENT;
1085                 if (mdev->state.disk > D_OUTDATED)
1086                         mdf |= MDF_WAS_UP_TO_DATE;
1087                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1088                         mdf |= MDF_PEER_OUT_DATED;
1089                 if (mdf != mdev->ldev->md.flags) {
1090                         mdev->ldev->md.flags = mdf;
1091                         drbd_md_mark_dirty(mdev);
1092                 }
1093                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1094                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1095                 put_ldev(mdev);
1096         }
1097
1098         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1099         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1100             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1101                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1102
1103         /* Receiver should clean up itself */
1104         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1105                 drbd_thread_stop_nowait(&mdev->receiver);
1106
1107         /* Now the receiver finished cleaning up itself, it should die */
1108         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1109                 drbd_thread_stop_nowait(&mdev->receiver);
1110
1111         /* Upon network failure, we need to restart the receiver. */
1112         if (os.conn > C_TEAR_DOWN &&
1113             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1114                 drbd_thread_restart_nowait(&mdev->receiver);
1115
1116         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1117         if (ascw) {
1118                 ascw->os = os;
1119                 ascw->ns = ns;
1120                 ascw->flags = flags;
1121                 ascw->w.cb = w_after_state_ch;
1122                 ascw->done = done;
1123                 drbd_queue_work(&mdev->data.work, &ascw->w);
1124         } else {
1125                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1126         }
1127
1128         return rv;
1129 }
1130
1131 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1132 {
1133         struct after_state_chg_work *ascw =
1134                 container_of(w, struct after_state_chg_work, w);
1135         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1136         if (ascw->flags & CS_WAIT_COMPLETE) {
1137                 D_ASSERT(ascw->done != NULL);
1138                 complete(ascw->done);
1139         }
1140         kfree(ascw);
1141
1142         return 1;
1143 }
1144
1145 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1146 {
1147         if (rv) {
1148                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1149                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1150                 return;
1151         }
1152
1153         switch (mdev->state.conn) {
1154         case C_STARTING_SYNC_T:
1155                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1156                 break;
1157         case C_STARTING_SYNC_S:
1158                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1159                 break;
1160         }
1161 }
1162
1163 /**
1164  * after_state_ch() - Perform after state change actions that may sleep
1165  * @mdev:       DRBD device.
1166  * @os:         old state.
1167  * @ns:         new state.
1168  * @flags:      Flags
1169  */
1170 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1171                            union drbd_state ns, enum chg_state_flags flags)
1172 {
1173         enum drbd_fencing_p fp;
1174
1175         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1176                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1177                 if (mdev->p_uuid)
1178                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1179         }
1180
1181         fp = FP_DONT_CARE;
1182         if (get_ldev(mdev)) {
1183                 fp = mdev->ldev->dc.fencing;
1184                 put_ldev(mdev);
1185         }
1186
1187         /* Inform userspace about the change... */
1188         drbd_bcast_state(mdev, ns);
1189
1190         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1191             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1192                 drbd_khelper(mdev, "pri-on-incon-degr");
1193
1194         /* Here we have the actions that are performed after a
1195            state change. This function might sleep */
1196
1197         if (fp == FP_STONITH && ns.susp) {
1198                 /* case1: The outdate peer handler is successful:
1199                  * case2: The connection was established again: */
1200                 if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1201                     (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1202                         tl_clear(mdev);
1203                         spin_lock_irq(&mdev->req_lock);
1204                         _drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1205                         spin_unlock_irq(&mdev->req_lock);
1206                 }
1207         }
1208         /* Do not change the order of the if above and the two below... */
1209         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1210                 drbd_send_uuids(mdev);
1211                 drbd_send_state(mdev);
1212         }
1213         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1214                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1215
1216         /* Lost contact to peer's copy of the data */
1217         if ((os.pdsk >= D_INCONSISTENT &&
1218              os.pdsk != D_UNKNOWN &&
1219              os.pdsk != D_OUTDATED)
1220         &&  (ns.pdsk < D_INCONSISTENT ||
1221              ns.pdsk == D_UNKNOWN ||
1222              ns.pdsk == D_OUTDATED)) {
1223                 kfree(mdev->p_uuid);
1224                 mdev->p_uuid = NULL;
1225                 if (get_ldev(mdev)) {
1226                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1227                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1228                                 drbd_uuid_new_current(mdev);
1229                                 drbd_send_uuids(mdev);
1230                         }
1231                         put_ldev(mdev);
1232                 }
1233         }
1234
1235         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1236                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1237                         drbd_uuid_new_current(mdev);
1238
1239                 /* D_DISKLESS Peer becomes secondary */
1240                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1241                         drbd_al_to_on_disk_bm(mdev);
1242                 put_ldev(mdev);
1243         }
1244
1245         /* Last part of the attaching process ... */
1246         if (ns.conn >= C_CONNECTED &&
1247             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1248                 kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1249                 mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1250                 drbd_send_sizes(mdev, 0);  /* to start sync... */
1251                 drbd_send_uuids(mdev);
1252                 drbd_send_state(mdev);
1253         }
1254
1255         /* We want to pause/continue resync, tell peer. */
1256         if (ns.conn >= C_CONNECTED &&
1257              ((os.aftr_isp != ns.aftr_isp) ||
1258               (os.user_isp != ns.user_isp)))
1259                 drbd_send_state(mdev);
1260
1261         /* In case one of the isp bits got set, suspend other devices. */
1262         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1263             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1264                 suspend_other_sg(mdev);
1265
1266         /* Make sure the peer gets informed about eventual state
1267            changes (ISP bits) while we were in WFReportParams. */
1268         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1269                 drbd_send_state(mdev);
1270
1271         /* We are in the progress to start a full sync... */
1272         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1273             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1274                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1275
1276         /* We are invalidating our self... */
1277         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1278             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1279                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1280
1281         if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1282                 enum drbd_io_error_p eh;
1283
1284                 eh = EP_PASS_ON;
1285                 if (get_ldev_if_state(mdev, D_FAILED)) {
1286                         eh = mdev->ldev->dc.on_io_error;
1287                         put_ldev(mdev);
1288                 }
1289
1290                 drbd_rs_cancel_all(mdev);
1291                 /* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1292                    and it is D_DISKLESS here, local_cnt can only go down, it can
1293                    not increase... It will reach zero */
1294                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1295                 mdev->rs_total = 0;
1296                 mdev->rs_failed = 0;
1297                 atomic_set(&mdev->rs_pending_cnt, 0);
1298
1299                 spin_lock_irq(&mdev->req_lock);
1300                 _drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1301                 spin_unlock_irq(&mdev->req_lock);
1302
1303                 if (eh == EP_CALL_HELPER)
1304                         drbd_khelper(mdev, "local-io-error");
1305         }
1306
1307         if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1308
1309                 if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1310                         if (drbd_send_state(mdev))
1311                                 dev_warn(DEV, "Notified peer that my disk is broken.\n");
1312                         else
1313                                 dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1314                 }
1315
1316                 lc_destroy(mdev->resync);
1317                 mdev->resync = NULL;
1318                 lc_destroy(mdev->act_log);
1319                 mdev->act_log = NULL;
1320                 __no_warn(local,
1321                         drbd_free_bc(mdev->ldev);
1322                         mdev->ldev = NULL;);
1323
1324                 if (mdev->md_io_tmpp)
1325                         __free_page(mdev->md_io_tmpp);
1326         }
1327
1328         /* Disks got bigger while they were detached */
1329         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1330             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1331                 if (ns.conn == C_CONNECTED)
1332                         resync_after_online_grow(mdev);
1333         }
1334
1335         /* A resync finished or aborted, wake paused devices... */
1336         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1337             (os.peer_isp && !ns.peer_isp) ||
1338             (os.user_isp && !ns.user_isp))
1339                 resume_next_sg(mdev);
1340
1341         /* Upon network connection, we need to start the receiver */
1342         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1343                 drbd_thread_start(&mdev->receiver);
1344
1345         /* Terminate worker thread if we are unconfigured - it will be
1346            restarted as needed... */
1347         if (ns.disk == D_DISKLESS &&
1348             ns.conn == C_STANDALONE &&
1349             ns.role == R_SECONDARY) {
1350                 if (os.aftr_isp != ns.aftr_isp)
1351                         resume_next_sg(mdev);
1352                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1353                 if (test_bit(DEVICE_DYING, &mdev->flags))
1354                         drbd_thread_stop_nowait(&mdev->worker);
1355         }
1356
1357         drbd_md_sync(mdev);
1358 }
1359
1360
1361 static int drbd_thread_setup(void *arg)
1362 {
1363         struct drbd_thread *thi = (struct drbd_thread *) arg;
1364         struct drbd_conf *mdev = thi->mdev;
1365         unsigned long flags;
1366         int retval;
1367
1368 restart:
1369         retval = thi->function(thi);
1370
1371         spin_lock_irqsave(&thi->t_lock, flags);
1372
1373         /* if the receiver has been "Exiting", the last thing it did
1374          * was set the conn state to "StandAlone",
1375          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1376          * and receiver thread will be "started".
1377          * drbd_thread_start needs to set "Restarting" in that case.
1378          * t_state check and assignment needs to be within the same spinlock,
1379          * so either thread_start sees Exiting, and can remap to Restarting,
1380          * or thread_start see None, and can proceed as normal.
1381          */
1382
1383         if (thi->t_state == Restarting) {
1384                 dev_info(DEV, "Restarting %s\n", current->comm);
1385                 thi->t_state = Running;
1386                 spin_unlock_irqrestore(&thi->t_lock, flags);
1387                 goto restart;
1388         }
1389
1390         thi->task = NULL;
1391         thi->t_state = None;
1392         smp_mb();
1393         complete(&thi->stop);
1394         spin_unlock_irqrestore(&thi->t_lock, flags);
1395
1396         dev_info(DEV, "Terminating %s\n", current->comm);
1397
1398         /* Release mod reference taken when thread was started */
1399         module_put(THIS_MODULE);
1400         return retval;
1401 }
1402
1403 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1404                       int (*func) (struct drbd_thread *))
1405 {
1406         spin_lock_init(&thi->t_lock);
1407         thi->task    = NULL;
1408         thi->t_state = None;
1409         thi->function = func;
1410         thi->mdev = mdev;
1411 }
1412
1413 int drbd_thread_start(struct drbd_thread *thi)
1414 {
1415         struct drbd_conf *mdev = thi->mdev;
1416         struct task_struct *nt;
1417         unsigned long flags;
1418
1419         const char *me =
1420                 thi == &mdev->receiver ? "receiver" :
1421                 thi == &mdev->asender  ? "asender"  :
1422                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1423
1424         /* is used from state engine doing drbd_thread_stop_nowait,
1425          * while holding the req lock irqsave */
1426         spin_lock_irqsave(&thi->t_lock, flags);
1427
1428         switch (thi->t_state) {
1429         case None:
1430                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1431                                 me, current->comm, current->pid);
1432
1433                 /* Get ref on module for thread - this is released when thread exits */
1434                 if (!try_module_get(THIS_MODULE)) {
1435                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1436                         spin_unlock_irqrestore(&thi->t_lock, flags);
1437                         return FALSE;
1438                 }
1439
1440                 init_completion(&thi->stop);
1441                 D_ASSERT(thi->task == NULL);
1442                 thi->reset_cpu_mask = 1;
1443                 thi->t_state = Running;
1444                 spin_unlock_irqrestore(&thi->t_lock, flags);
1445                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1446
1447                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1448                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1449
1450                 if (IS_ERR(nt)) {
1451                         dev_err(DEV, "Couldn't start thread\n");
1452
1453                         module_put(THIS_MODULE);
1454                         return FALSE;
1455                 }
1456                 spin_lock_irqsave(&thi->t_lock, flags);
1457                 thi->task = nt;
1458                 thi->t_state = Running;
1459                 spin_unlock_irqrestore(&thi->t_lock, flags);
1460                 wake_up_process(nt);
1461                 break;
1462         case Exiting:
1463                 thi->t_state = Restarting;
1464                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1465                                 me, current->comm, current->pid);
1466                 /* fall through */
1467         case Running:
1468         case Restarting:
1469         default:
1470                 spin_unlock_irqrestore(&thi->t_lock, flags);
1471                 break;
1472         }
1473
1474         return TRUE;
1475 }
1476
1477
1478 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1479 {
1480         unsigned long flags;
1481
1482         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1483
1484         /* may be called from state engine, holding the req lock irqsave */
1485         spin_lock_irqsave(&thi->t_lock, flags);
1486
1487         if (thi->t_state == None) {
1488                 spin_unlock_irqrestore(&thi->t_lock, flags);
1489                 if (restart)
1490                         drbd_thread_start(thi);
1491                 return;
1492         }
1493
1494         if (thi->t_state != ns) {
1495                 if (thi->task == NULL) {
1496                         spin_unlock_irqrestore(&thi->t_lock, flags);
1497                         return;
1498                 }
1499
1500                 thi->t_state = ns;
1501                 smp_mb();
1502                 init_completion(&thi->stop);
1503                 if (thi->task != current)
1504                         force_sig(DRBD_SIGKILL, thi->task);
1505
1506         }
1507
1508         spin_unlock_irqrestore(&thi->t_lock, flags);
1509
1510         if (wait)
1511                 wait_for_completion(&thi->stop);
1512 }
1513
1514 #ifdef CONFIG_SMP
1515 /**
1516  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1517  * @mdev:       DRBD device.
1518  *
1519  * Forces all threads of a device onto the same CPU. This is beneficial for
1520  * DRBD's performance. May be overwritten by user's configuration.
1521  */
1522 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1523 {
1524         int ord, cpu;
1525
1526         /* user override. */
1527         if (cpumask_weight(mdev->cpu_mask))
1528                 return;
1529
1530         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1531         for_each_online_cpu(cpu) {
1532                 if (ord-- == 0) {
1533                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1534                         return;
1535                 }
1536         }
1537         /* should not be reached */
1538         cpumask_setall(mdev->cpu_mask);
1539 }
1540
1541 /**
1542  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1543  * @mdev:       DRBD device.
1544  *
1545  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1546  * prematurely.
1547  */
1548 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1549 {
1550         struct task_struct *p = current;
1551         struct drbd_thread *thi =
1552                 p == mdev->asender.task  ? &mdev->asender  :
1553                 p == mdev->receiver.task ? &mdev->receiver :
1554                 p == mdev->worker.task   ? &mdev->worker   :
1555                 NULL;
1556         ERR_IF(thi == NULL)
1557                 return;
1558         if (!thi->reset_cpu_mask)
1559                 return;
1560         thi->reset_cpu_mask = 0;
1561         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1562 }
1563 #endif
1564
1565 /* the appropriate socket mutex must be held already */
1566 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1567                           enum drbd_packets cmd, struct p_header *h,
1568                           size_t size, unsigned msg_flags)
1569 {
1570         int sent, ok;
1571
1572         ERR_IF(!h) return FALSE;
1573         ERR_IF(!size) return FALSE;
1574
1575         h->magic   = BE_DRBD_MAGIC;
1576         h->command = cpu_to_be16(cmd);
1577         h->length  = cpu_to_be16(size-sizeof(struct p_header));
1578
1579         trace_drbd_packet(mdev, sock, 0, (void *)h, __FILE__, __LINE__);
1580         sent = drbd_send(mdev, sock, h, size, msg_flags);
1581
1582         ok = (sent == size);
1583         if (!ok)
1584                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1585                     cmdname(cmd), (int)size, sent);
1586         return ok;
1587 }
1588
1589 /* don't pass the socket. we may only look at it
1590  * when we hold the appropriate socket mutex.
1591  */
1592 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1593                   enum drbd_packets cmd, struct p_header *h, size_t size)
1594 {
1595         int ok = 0;
1596         struct socket *sock;
1597
1598         if (use_data_socket) {
1599                 mutex_lock(&mdev->data.mutex);
1600                 sock = mdev->data.socket;
1601         } else {
1602                 mutex_lock(&mdev->meta.mutex);
1603                 sock = mdev->meta.socket;
1604         }
1605
1606         /* drbd_disconnect() could have called drbd_free_sock()
1607          * while we were waiting in down()... */
1608         if (likely(sock != NULL))
1609                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1610
1611         if (use_data_socket)
1612                 mutex_unlock(&mdev->data.mutex);
1613         else
1614                 mutex_unlock(&mdev->meta.mutex);
1615         return ok;
1616 }
1617
1618 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1619                    size_t size)
1620 {
1621         struct p_header h;
1622         int ok;
1623
1624         h.magic   = BE_DRBD_MAGIC;
1625         h.command = cpu_to_be16(cmd);
1626         h.length  = cpu_to_be16(size);
1627
1628         if (!drbd_get_data_sock(mdev))
1629                 return 0;
1630
1631         trace_drbd_packet(mdev, mdev->data.socket, 0, (void *)&h, __FILE__, __LINE__);
1632
1633         ok = (sizeof(h) ==
1634                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1635         ok = ok && (size ==
1636                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1637
1638         drbd_put_data_sock(mdev);
1639
1640         return ok;
1641 }
1642
1643 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1644 {
1645         struct p_rs_param_89 *p;
1646         struct socket *sock;
1647         int size, rv;
1648         const int apv = mdev->agreed_pro_version;
1649
1650         size = apv <= 87 ? sizeof(struct p_rs_param)
1651                 : apv == 88 ? sizeof(struct p_rs_param)
1652                         + strlen(mdev->sync_conf.verify_alg) + 1
1653                 : /* 89 */    sizeof(struct p_rs_param_89);
1654
1655         /* used from admin command context and receiver/worker context.
1656          * to avoid kmalloc, grab the socket right here,
1657          * then use the pre-allocated sbuf there */
1658         mutex_lock(&mdev->data.mutex);
1659         sock = mdev->data.socket;
1660
1661         if (likely(sock != NULL)) {
1662                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1663
1664                 p = &mdev->data.sbuf.rs_param_89;
1665
1666                 /* initialize verify_alg and csums_alg */
1667                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1668
1669                 p->rate = cpu_to_be32(sc->rate);
1670
1671                 if (apv >= 88)
1672                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1673                 if (apv >= 89)
1674                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1675
1676                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1677         } else
1678                 rv = 0; /* not ok */
1679
1680         mutex_unlock(&mdev->data.mutex);
1681
1682         return rv;
1683 }
1684
1685 int drbd_send_protocol(struct drbd_conf *mdev)
1686 {
1687         struct p_protocol *p;
1688         int size, rv;
1689
1690         size = sizeof(struct p_protocol);
1691
1692         if (mdev->agreed_pro_version >= 87)
1693                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1694
1695         /* we must not recurse into our own queue,
1696          * as that is blocked during handshake */
1697         p = kmalloc(size, GFP_NOIO);
1698         if (p == NULL)
1699                 return 0;
1700
1701         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1702         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1703         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1704         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1705         p->want_lose     = cpu_to_be32(mdev->net_conf->want_lose);
1706         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1707
1708         if (mdev->agreed_pro_version >= 87)
1709                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1710
1711         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1712                            (struct p_header *)p, size);
1713         kfree(p);
1714         return rv;
1715 }
1716
1717 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1718 {
1719         struct p_uuids p;
1720         int i;
1721
1722         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1723                 return 1;
1724
1725         for (i = UI_CURRENT; i < UI_SIZE; i++)
1726                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1727
1728         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1729         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1730         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1731         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1732         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1733         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1734
1735         put_ldev(mdev);
1736
1737         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1738                              (struct p_header *)&p, sizeof(p));
1739 }
1740
1741 int drbd_send_uuids(struct drbd_conf *mdev)
1742 {
1743         return _drbd_send_uuids(mdev, 0);
1744 }
1745
1746 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1747 {
1748         return _drbd_send_uuids(mdev, 8);
1749 }
1750
1751
1752 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1753 {
1754         struct p_rs_uuid p;
1755
1756         p.uuid = cpu_to_be64(val);
1757
1758         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1759                              (struct p_header *)&p, sizeof(p));
1760 }
1761
1762 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply)
1763 {
1764         struct p_sizes p;
1765         sector_t d_size, u_size;
1766         int q_order_type;
1767         int ok;
1768
1769         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1770                 D_ASSERT(mdev->ldev->backing_bdev);
1771                 d_size = drbd_get_max_capacity(mdev->ldev);
1772                 u_size = mdev->ldev->dc.disk_size;
1773                 q_order_type = drbd_queue_order_type(mdev);
1774                 p.queue_order_type = cpu_to_be32(drbd_queue_order_type(mdev));
1775                 put_ldev(mdev);
1776         } else {
1777                 d_size = 0;
1778                 u_size = 0;
1779                 q_order_type = QUEUE_ORDERED_NONE;
1780         }
1781
1782         p.d_size = cpu_to_be64(d_size);
1783         p.u_size = cpu_to_be64(u_size);
1784         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1785         p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1786         p.queue_order_type = cpu_to_be32(q_order_type);
1787
1788         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1789                            (struct p_header *)&p, sizeof(p));
1790         return ok;
1791 }
1792
1793 /**
1794  * drbd_send_state() - Sends the drbd state to the peer
1795  * @mdev:       DRBD device.
1796  */
1797 int drbd_send_state(struct drbd_conf *mdev)
1798 {
1799         struct socket *sock;
1800         struct p_state p;
1801         int ok = 0;
1802
1803         /* Grab state lock so we wont send state if we're in the middle
1804          * of a cluster wide state change on another thread */
1805         drbd_state_lock(mdev);
1806
1807         mutex_lock(&mdev->data.mutex);
1808
1809         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1810         sock = mdev->data.socket;
1811
1812         if (likely(sock != NULL)) {
1813                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
1814                                     (struct p_header *)&p, sizeof(p), 0);
1815         }
1816
1817         mutex_unlock(&mdev->data.mutex);
1818
1819         drbd_state_unlock(mdev);
1820         return ok;
1821 }
1822
1823 int drbd_send_state_req(struct drbd_conf *mdev,
1824         union drbd_state mask, union drbd_state val)
1825 {
1826         struct p_req_state p;
1827
1828         p.mask    = cpu_to_be32(mask.i);
1829         p.val     = cpu_to_be32(val.i);
1830
1831         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1832                              (struct p_header *)&p, sizeof(p));
1833 }
1834
1835 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1836 {
1837         struct p_req_state_reply p;
1838
1839         p.retcode    = cpu_to_be32(retcode);
1840
1841         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1842                              (struct p_header *)&p, sizeof(p));
1843 }
1844
1845 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1846         struct p_compressed_bm *p,
1847         struct bm_xfer_ctx *c)
1848 {
1849         struct bitstream bs;
1850         unsigned long plain_bits;
1851         unsigned long tmp;
1852         unsigned long rl;
1853         unsigned len;
1854         unsigned toggle;
1855         int bits;
1856
1857         /* may we use this feature? */
1858         if ((mdev->sync_conf.use_rle == 0) ||
1859                 (mdev->agreed_pro_version < 90))
1860                         return 0;
1861
1862         if (c->bit_offset >= c->bm_bits)
1863                 return 0; /* nothing to do. */
1864
1865         /* use at most thus many bytes */
1866         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1867         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1868         /* plain bits covered in this code string */
1869         plain_bits = 0;
1870
1871         /* p->encoding & 0x80 stores whether the first run length is set.
1872          * bit offset is implicit.
1873          * start with toggle == 2 to be able to tell the first iteration */
1874         toggle = 2;
1875
1876         /* see how much plain bits we can stuff into one packet
1877          * using RLE and VLI. */
1878         do {
1879                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1880                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1881                 if (tmp == -1UL)
1882                         tmp = c->bm_bits;
1883                 rl = tmp - c->bit_offset;
1884
1885                 if (toggle == 2) { /* first iteration */
1886                         if (rl == 0) {
1887                                 /* the first checked bit was set,
1888                                  * store start value, */
1889                                 DCBP_set_start(p, 1);
1890                                 /* but skip encoding of zero run length */
1891                                 toggle = !toggle;
1892                                 continue;
1893                         }
1894                         DCBP_set_start(p, 0);
1895                 }
1896
1897                 /* paranoia: catch zero runlength.
1898                  * can only happen if bitmap is modified while we scan it. */
1899                 if (rl == 0) {
1900                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1901                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1902                         return -1;
1903                 }
1904
1905                 bits = vli_encode_bits(&bs, rl);
1906                 if (bits == -ENOBUFS) /* buffer full */
1907                         break;
1908                 if (bits <= 0) {
1909                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1910                         return 0;
1911                 }
1912
1913                 toggle = !toggle;
1914                 plain_bits += rl;
1915                 c->bit_offset = tmp;
1916         } while (c->bit_offset < c->bm_bits);
1917
1918         len = bs.cur.b - p->code + !!bs.cur.bit;
1919
1920         if (plain_bits < (len << 3)) {
1921                 /* incompressible with this method.
1922                  * we need to rewind both word and bit position. */
1923                 c->bit_offset -= plain_bits;
1924                 bm_xfer_ctx_bit_to_word_offset(c);
1925                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1926                 return 0;
1927         }
1928
1929         /* RLE + VLI was able to compress it just fine.
1930          * update c->word_offset. */
1931         bm_xfer_ctx_bit_to_word_offset(c);
1932
1933         /* store pad_bits */
1934         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1935
1936         return len;
1937 }
1938
1939 enum { OK, FAILED, DONE }
1940 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1941         struct p_header *h, struct bm_xfer_ctx *c)
1942 {
1943         struct p_compressed_bm *p = (void*)h;
1944         unsigned long num_words;
1945         int len;
1946         int ok;
1947
1948         len = fill_bitmap_rle_bits(mdev, p, c);
1949
1950         if (len < 0)
1951                 return FAILED;
1952
1953         if (len) {
1954                 DCBP_set_code(p, RLE_VLI_Bits);
1955                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1956                         sizeof(*p) + len, 0);
1957
1958                 c->packets[0]++;
1959                 c->bytes[0] += sizeof(*p) + len;
1960
1961                 if (c->bit_offset >= c->bm_bits)
1962                         len = 0; /* DONE */
1963         } else {
1964                 /* was not compressible.
1965                  * send a buffer full of plain text bits instead. */
1966                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1967                 len = num_words * sizeof(long);
1968                 if (len)
1969                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1970                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1971                                    h, sizeof(struct p_header) + len, 0);
1972                 c->word_offset += num_words;
1973                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1974
1975                 c->packets[1]++;
1976                 c->bytes[1] += sizeof(struct p_header) + len;
1977
1978                 if (c->bit_offset > c->bm_bits)
1979                         c->bit_offset = c->bm_bits;
1980         }
1981         ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1982
1983         if (ok == DONE)
1984                 INFO_bm_xfer_stats(mdev, "send", c);
1985         return ok;
1986 }
1987
1988 /* See the comment at receive_bitmap() */
1989 int _drbd_send_bitmap(struct drbd_conf *mdev)
1990 {
1991         struct bm_xfer_ctx c;
1992         struct p_header *p;
1993         int ret;
1994
1995         ERR_IF(!mdev->bitmap) return FALSE;
1996
1997         /* maybe we should use some per thread scratch page,
1998          * and allocate that during initial device creation? */
1999         p = (struct p_header *) __get_free_page(GFP_NOIO);
2000         if (!p) {
2001                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2002                 return FALSE;
2003         }
2004
2005         if (get_ldev(mdev)) {
2006                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2007                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2008                         drbd_bm_set_all(mdev);
2009                         if (drbd_bm_write(mdev)) {
2010                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2011                                  * but otherwise process as per normal - need to tell other
2012                                  * side that a full resync is required! */
2013                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2014                         } else {
2015                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2016                                 drbd_md_sync(mdev);
2017                         }
2018                 }
2019                 put_ldev(mdev);
2020         }
2021
2022         c = (struct bm_xfer_ctx) {
2023                 .bm_bits = drbd_bm_bits(mdev),
2024                 .bm_words = drbd_bm_words(mdev),
2025         };
2026
2027         do {
2028                 ret = send_bitmap_rle_or_plain(mdev, p, &c);
2029         } while (ret == OK);
2030
2031         free_page((unsigned long) p);
2032         return (ret == DONE);
2033 }
2034
2035 int drbd_send_bitmap(struct drbd_conf *mdev)
2036 {
2037         int err;
2038
2039         if (!drbd_get_data_sock(mdev))
2040                 return -1;
2041         err = !_drbd_send_bitmap(mdev);
2042         drbd_put_data_sock(mdev);
2043         return err;
2044 }
2045
2046 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2047 {
2048         int ok;
2049         struct p_barrier_ack p;
2050
2051         p.barrier  = barrier_nr;
2052         p.set_size = cpu_to_be32(set_size);
2053
2054         if (mdev->state.conn < C_CONNECTED)
2055                 return FALSE;
2056         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2057                         (struct p_header *)&p, sizeof(p));
2058         return ok;
2059 }
2060
2061 /**
2062  * _drbd_send_ack() - Sends an ack packet
2063  * @mdev:       DRBD device.
2064  * @cmd:        Packet command code.
2065  * @sector:     sector, needs to be in big endian byte order
2066  * @blksize:    size in byte, needs to be in big endian byte order
2067  * @block_id:   Id, big endian byte order
2068  */
2069 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2070                           u64 sector,
2071                           u32 blksize,
2072                           u64 block_id)
2073 {
2074         int ok;
2075         struct p_block_ack p;
2076
2077         p.sector   = sector;
2078         p.block_id = block_id;
2079         p.blksize  = blksize;
2080         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2081
2082         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2083                 return FALSE;
2084         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2085                                 (struct p_header *)&p, sizeof(p));
2086         return ok;
2087 }
2088
2089 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2090                      struct p_data *dp)
2091 {
2092         const int header_size = sizeof(struct p_data)
2093                               - sizeof(struct p_header);
2094         int data_size  = ((struct p_header *)dp)->length - header_size;
2095
2096         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2097                               dp->block_id);
2098 }
2099
2100 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2101                      struct p_block_req *rp)
2102 {
2103         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2104 }
2105
2106 /**
2107  * drbd_send_ack() - Sends an ack packet
2108  * @mdev:       DRBD device.
2109  * @cmd:        Packet command code.
2110  * @e:          Epoch entry.
2111  */
2112 int drbd_send_ack(struct drbd_conf *mdev,
2113         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2114 {
2115         return _drbd_send_ack(mdev, cmd,
2116                               cpu_to_be64(e->sector),
2117                               cpu_to_be32(e->size),
2118                               e->block_id);
2119 }
2120
2121 /* This function misuses the block_id field to signal if the blocks
2122  * are is sync or not. */
2123 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2124                      sector_t sector, int blksize, u64 block_id)
2125 {
2126         return _drbd_send_ack(mdev, cmd,
2127                               cpu_to_be64(sector),
2128                               cpu_to_be32(blksize),
2129                               cpu_to_be64(block_id));
2130 }
2131
2132 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2133                        sector_t sector, int size, u64 block_id)
2134 {
2135         int ok;
2136         struct p_block_req p;
2137
2138         p.sector   = cpu_to_be64(sector);
2139         p.block_id = block_id;
2140         p.blksize  = cpu_to_be32(size);
2141
2142         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2143                                 (struct p_header *)&p, sizeof(p));
2144         return ok;
2145 }
2146
2147 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2148                             sector_t sector, int size,
2149                             void *digest, int digest_size,
2150                             enum drbd_packets cmd)
2151 {
2152         int ok;
2153         struct p_block_req p;
2154
2155         p.sector   = cpu_to_be64(sector);
2156         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2157         p.blksize  = cpu_to_be32(size);
2158
2159         p.head.magic   = BE_DRBD_MAGIC;
2160         p.head.command = cpu_to_be16(cmd);
2161         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2162
2163         mutex_lock(&mdev->data.mutex);
2164
2165         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2166         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2167
2168         mutex_unlock(&mdev->data.mutex);
2169
2170         return ok;
2171 }
2172
2173 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2174 {
2175         int ok;
2176         struct p_block_req p;
2177
2178         p.sector   = cpu_to_be64(sector);
2179         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2180         p.blksize  = cpu_to_be32(size);
2181
2182         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2183                            (struct p_header *)&p, sizeof(p));
2184         return ok;
2185 }
2186
2187 /* called on sndtimeo
2188  * returns FALSE if we should retry,
2189  * TRUE if we think connection is dead
2190  */
2191 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2192 {
2193         int drop_it;
2194         /* long elapsed = (long)(jiffies - mdev->last_received); */
2195
2196         drop_it =   mdev->meta.socket == sock
2197                 || !mdev->asender.task
2198                 || get_t_state(&mdev->asender) != Running
2199                 || mdev->state.conn < C_CONNECTED;
2200
2201         if (drop_it)
2202                 return TRUE;
2203
2204         drop_it = !--mdev->ko_count;
2205         if (!drop_it) {
2206                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2207                        current->comm, current->pid, mdev->ko_count);
2208                 request_ping(mdev);
2209         }
2210
2211         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2212 }
2213
2214 /* The idea of sendpage seems to be to put some kind of reference
2215  * to the page into the skb, and to hand it over to the NIC. In
2216  * this process get_page() gets called.
2217  *
2218  * As soon as the page was really sent over the network put_page()
2219  * gets called by some part of the network layer. [ NIC driver? ]
2220  *
2221  * [ get_page() / put_page() increment/decrement the count. If count
2222  *   reaches 0 the page will be freed. ]
2223  *
2224  * This works nicely with pages from FSs.
2225  * But this means that in protocol A we might signal IO completion too early!
2226  *
2227  * In order not to corrupt data during a resync we must make sure
2228  * that we do not reuse our own buffer pages (EEs) to early, therefore
2229  * we have the net_ee list.
2230  *
2231  * XFS seems to have problems, still, it submits pages with page_count == 0!
2232  * As a workaround, we disable sendpage on pages
2233  * with page_count == 0 or PageSlab.
2234  */
2235 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2236                    int offset, size_t size)
2237 {
2238         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2239         kunmap(page);
2240         if (sent == size)
2241                 mdev->send_cnt += size>>9;
2242         return sent == size;
2243 }
2244
2245 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2246                     int offset, size_t size)
2247 {
2248         mm_segment_t oldfs = get_fs();
2249         int sent, ok;
2250         int len = size;
2251
2252         /* e.g. XFS meta- & log-data is in slab pages, which have a
2253          * page_count of 0 and/or have PageSlab() set.
2254          * we cannot use send_page for those, as that does get_page();
2255          * put_page(); and would cause either a VM_BUG directly, or
2256          * __page_cache_release a page that would actually still be referenced
2257          * by someone, leading to some obscure delayed Oops somewhere else. */
2258         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2259                 return _drbd_no_send_page(mdev, page, offset, size);
2260
2261         drbd_update_congested(mdev);
2262         set_fs(KERNEL_DS);
2263         do {
2264                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2265                                                         offset, len,
2266                                                         MSG_NOSIGNAL);
2267                 if (sent == -EAGAIN) {
2268                         if (we_should_drop_the_connection(mdev,
2269                                                           mdev->data.socket))
2270                                 break;
2271                         else
2272                                 continue;
2273                 }
2274                 if (sent <= 0) {
2275                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2276                              __func__, (int)size, len, sent);
2277                         break;
2278                 }
2279                 len    -= sent;
2280                 offset += sent;
2281         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2282         set_fs(oldfs);
2283         clear_bit(NET_CONGESTED, &mdev->flags);
2284
2285         ok = (len == 0);
2286         if (likely(ok))
2287                 mdev->send_cnt += size>>9;
2288         return ok;
2289 }
2290
2291 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2292 {
2293         struct bio_vec *bvec;
2294         int i;
2295         __bio_for_each_segment(bvec, bio, i, 0) {
2296                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2297                                      bvec->bv_offset, bvec->bv_len))
2298                         return 0;
2299         }
2300         return 1;
2301 }
2302
2303 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2304 {
2305         struct bio_vec *bvec;
2306         int i;
2307         __bio_for_each_segment(bvec, bio, i, 0) {
2308                 if (!_drbd_send_page(mdev, bvec->bv_page,
2309                                      bvec->bv_offset, bvec->bv_len))
2310                         return 0;
2311         }
2312
2313         return 1;
2314 }
2315
2316 /* Used to send write requests
2317  * R_PRIMARY -> Peer    (P_DATA)
2318  */
2319 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2320 {
2321         int ok = 1;
2322         struct p_data p;
2323         unsigned int dp_flags = 0;
2324         void *dgb;
2325         int dgs;
2326
2327         if (!drbd_get_data_sock(mdev))
2328                 return 0;
2329
2330         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2331                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2332
2333         p.head.magic   = BE_DRBD_MAGIC;
2334         p.head.command = cpu_to_be16(P_DATA);
2335         p.head.length  =
2336                 cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2337
2338         p.sector   = cpu_to_be64(req->sector);
2339         p.block_id = (unsigned long)req;
2340         p.seq_num  = cpu_to_be32(req->seq_num =
2341                                  atomic_add_return(1, &mdev->packet_seq));
2342         dp_flags = 0;
2343
2344         /* NOTE: no need to check if barriers supported here as we would
2345          *       not pass the test in make_request_common in that case
2346          */
2347         if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2348                 dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2349                 /* dp_flags |= DP_HARDBARRIER; */
2350         }
2351         if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2352                 dp_flags |= DP_RW_SYNC;
2353         /* for now handle SYNCIO and UNPLUG
2354          * as if they still were one and the same flag */
2355         if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2356                 dp_flags |= DP_RW_SYNC;
2357         if (mdev->state.conn >= C_SYNC_SOURCE &&
2358             mdev->state.conn <= C_PAUSED_SYNC_T)
2359                 dp_flags |= DP_MAY_SET_IN_SYNC;
2360
2361         p.dp_flags = cpu_to_be32(dp_flags);
2362         trace_drbd_packet(mdev, mdev->data.socket, 0, (void *)&p, __FILE__, __LINE__);
2363         set_bit(UNPLUG_REMOTE, &mdev->flags);
2364         ok = (sizeof(p) ==
2365                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2366         if (ok && dgs) {
2367                 dgb = mdev->int_dig_out;
2368                 drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2369                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2370         }
2371         if (ok) {
2372                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2373                         ok = _drbd_send_bio(mdev, req->master_bio);
2374                 else
2375                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2376         }
2377
2378         drbd_put_data_sock(mdev);
2379         return ok;
2380 }
2381
2382 /* answer packet, used to send data back for read requests:
2383  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2384  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2385  */
2386 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2387                     struct drbd_epoch_entry *e)
2388 {
2389         int ok;
2390         struct p_data p;
2391         void *dgb;
2392         int dgs;
2393
2394         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2395                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2396
2397         p.head.magic   = BE_DRBD_MAGIC;
2398         p.head.command = cpu_to_be16(cmd);
2399         p.head.length  =
2400                 cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2401
2402         p.sector   = cpu_to_be64(e->sector);
2403         p.block_id = e->block_id;
2404         /* p.seq_num  = 0;    No sequence numbers here.. */
2405
2406         /* Only called by our kernel thread.
2407          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2408          * in response to admin command or module unload.
2409          */
2410         if (!drbd_get_data_sock(mdev))
2411                 return 0;
2412
2413         trace_drbd_packet(mdev, mdev->data.socket, 0, (void *)&p, __FILE__, __LINE__);
2414         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2415                                         sizeof(p), MSG_MORE);
2416         if (ok && dgs) {
2417                 dgb = mdev->int_dig_out;
2418                 drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
2419                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2420         }
2421         if (ok)
2422                 ok = _drbd_send_zc_bio(mdev, e->private_bio);
2423
2424         drbd_put_data_sock(mdev);
2425         return ok;
2426 }
2427
2428 /*
2429   drbd_send distinguishes two cases:
2430
2431   Packets sent via the data socket "sock"
2432   and packets sent via the meta data socket "msock"
2433
2434                     sock                      msock
2435   -----------------+-------------------------+------------------------------
2436   timeout           conf.timeout / 2          conf.timeout / 2
2437   timeout action    send a ping via msock     Abort communication
2438                                               and close all sockets
2439 */
2440
2441 /*
2442  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2443  */
2444 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2445               void *buf, size_t size, unsigned msg_flags)
2446 {
2447         struct kvec iov;
2448         struct msghdr msg;
2449         int rv, sent = 0;
2450
2451         if (!sock)
2452                 return -1000;
2453
2454         /* THINK  if (signal_pending) return ... ? */
2455
2456         iov.iov_base = buf;
2457         iov.iov_len  = size;
2458
2459         msg.msg_name       = NULL;
2460         msg.msg_namelen    = 0;
2461         msg.msg_control    = NULL;
2462         msg.msg_controllen = 0;
2463         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2464
2465         if (sock == mdev->data.socket) {
2466                 mdev->ko_count = mdev->net_conf->ko_count;
2467                 drbd_update_congested(mdev);
2468         }
2469         do {
2470                 /* STRANGE
2471                  * tcp_sendmsg does _not_ use its size parameter at all ?
2472                  *
2473                  * -EAGAIN on timeout, -EINTR on signal.
2474                  */
2475 /* THINK
2476  * do we need to block DRBD_SIG if sock == &meta.socket ??
2477  * otherwise wake_asender() might interrupt some send_*Ack !
2478  */
2479                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2480                 if (rv == -EAGAIN) {
2481                         if (we_should_drop_the_connection(mdev, sock))
2482                                 break;
2483                         else
2484                                 continue;
2485                 }
2486                 D_ASSERT(rv != 0);
2487                 if (rv == -EINTR) {
2488                         flush_signals(current);
2489                         rv = 0;
2490                 }
2491                 if (rv < 0)
2492                         break;
2493                 sent += rv;
2494                 iov.iov_base += rv;
2495                 iov.iov_len  -= rv;
2496         } while (sent < size);
2497
2498         if (sock == mdev->data.socket)
2499                 clear_bit(NET_CONGESTED, &mdev->flags);
2500
2501         if (rv <= 0) {
2502                 if (rv != -EAGAIN) {
2503                         dev_err(DEV, "%s_sendmsg returned %d\n",
2504                             sock == mdev->meta.socket ? "msock" : "sock",
2505                             rv);
2506                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2507                 } else
2508                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2509         }
2510
2511         return sent;
2512 }
2513
2514 static int drbd_open(struct block_device *bdev, fmode_t mode)
2515 {
2516         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2517         unsigned long flags;
2518         int rv = 0;
2519
2520         spin_lock_irqsave(&mdev->req_lock, flags);
2521         /* to have a stable mdev->state.role
2522          * and no race with updating open_cnt */
2523
2524         if (mdev->state.role != R_PRIMARY) {
2525                 if (mode & FMODE_WRITE)
2526                         rv = -EROFS;
2527                 else if (!allow_oos)
2528                         rv = -EMEDIUMTYPE;
2529         }
2530
2531         if (!rv)
2532                 mdev->open_cnt++;
2533         spin_unlock_irqrestore(&mdev->req_lock, flags);
2534
2535         return rv;
2536 }
2537
2538 static int drbd_release(struct gendisk *gd, fmode_t mode)
2539 {
2540         struct drbd_conf *mdev = gd->private_data;
2541         mdev->open_cnt--;
2542         return 0;
2543 }
2544
2545 static void drbd_unplug_fn(struct request_queue *q)
2546 {
2547         struct drbd_conf *mdev = q->queuedata;
2548
2549         trace_drbd_unplug(mdev, "got unplugged");
2550
2551         /* unplug FIRST */
2552         spin_lock_irq(q->queue_lock);
2553         blk_remove_plug(q);
2554         spin_unlock_irq(q->queue_lock);
2555
2556         /* only if connected */
2557         spin_lock_irq(&mdev->req_lock);
2558         if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2559                 D_ASSERT(mdev->state.role == R_PRIMARY);
2560                 if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2561                         /* add to the data.work queue,
2562                          * unless already queued.
2563                          * XXX this might be a good addition to drbd_queue_work
2564                          * anyways, to detect "double queuing" ... */
2565                         if (list_empty(&mdev->unplug_work.list))
2566                                 drbd_queue_work(&mdev->data.work,
2567                                                 &mdev->unplug_work);
2568                 }
2569         }
2570         spin_unlock_irq(&mdev->req_lock);
2571
2572         if (mdev->state.disk >= D_INCONSISTENT)
2573                 drbd_kick_lo(mdev);
2574 }
2575
2576 static void drbd_set_defaults(struct drbd_conf *mdev)
2577 {
2578         mdev->sync_conf.after      = DRBD_AFTER_DEF;
2579         mdev->sync_conf.rate       = DRBD_RATE_DEF;
2580         mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2581         mdev->state = (union drbd_state) {
2582                 { .role = R_SECONDARY,
2583                   .peer = R_UNKNOWN,
2584                   .conn = C_STANDALONE,
2585                   .disk = D_DISKLESS,
2586                   .pdsk = D_UNKNOWN,
2587                   .susp = 0
2588                 } };
2589 }
2590
2591 void drbd_init_set_defaults(struct drbd_conf *mdev)
2592 {
2593         /* the memset(,0,) did most of this.
2594          * note: only assignments, no allocation in here */
2595
2596         drbd_set_defaults(mdev);
2597
2598         /* for now, we do NOT yet support it,
2599          * even though we start some framework
2600          * to eventually support barriers */
2601         set_bit(NO_BARRIER_SUPP, &mdev->flags);
2602
2603         atomic_set(&mdev->ap_bio_cnt, 0);
2604         atomic_set(&mdev->ap_pending_cnt, 0);
2605         atomic_set(&mdev->rs_pending_cnt, 0);
2606         atomic_set(&mdev->unacked_cnt, 0);
2607         atomic_set(&mdev->local_cnt, 0);
2608         atomic_set(&mdev->net_cnt, 0);
2609         atomic_set(&mdev->packet_seq, 0);
2610         atomic_set(&mdev->pp_in_use, 0);
2611
2612         mutex_init(&mdev->md_io_mutex);
2613         mutex_init(&mdev->data.mutex);
2614         mutex_init(&mdev->meta.mutex);
2615         sema_init(&mdev->data.work.s, 0);
2616         sema_init(&mdev->meta.work.s, 0);
2617         mutex_init(&mdev->state_mutex);
2618
2619         spin_lock_init(&mdev->data.work.q_lock);
2620         spin_lock_init(&mdev->meta.work.q_lock);
2621
2622         spin_lock_init(&mdev->al_lock);
2623         spin_lock_init(&mdev->req_lock);
2624         spin_lock_init(&mdev->peer_seq_lock);
2625         spin_lock_init(&mdev->epoch_lock);
2626
2627         INIT_LIST_HEAD(&mdev->active_ee);
2628         INIT_LIST_HEAD(&mdev->sync_ee);
2629         INIT_LIST_HEAD(&mdev->done_ee);
2630         INIT_LIST_HEAD(&mdev->read_ee);
2631         INIT_LIST_HEAD(&mdev->net_ee);
2632         INIT_LIST_HEAD(&mdev->resync_reads);
2633         INIT_LIST_HEAD(&mdev->data.work.q);
2634         INIT_LIST_HEAD(&mdev->meta.work.q);
2635         INIT_LIST_HEAD(&mdev->resync_work.list);
2636         INIT_LIST_HEAD(&mdev->unplug_work.list);
2637         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2638         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2639         mdev->resync_work.cb  = w_resync_inactive;
2640         mdev->unplug_work.cb  = w_send_write_hint;
2641         mdev->md_sync_work.cb = w_md_sync;
2642         mdev->bm_io_work.w.cb = w_bitmap_io;
2643         init_timer(&mdev->resync_timer);
2644         init_timer(&mdev->md_sync_timer);
2645         mdev->resync_timer.function = resync_timer_fn;
2646         mdev->resync_timer.data = (unsigned long) mdev;
2647         mdev->md_sync_timer.function = md_sync_timer_fn;
2648         mdev->md_sync_timer.data = (unsigned long) mdev;
2649
2650         init_waitqueue_head(&mdev->misc_wait);
2651         init_waitqueue_head(&mdev->state_wait);
2652         init_waitqueue_head(&mdev->ee_wait);
2653         init_waitqueue_head(&mdev->al_wait);
2654         init_waitqueue_head(&mdev->seq_wait);
2655
2656         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2657         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2658         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2659
2660         mdev->agreed_pro_version = PRO_VERSION_MAX;
2661         mdev->write_ordering = WO_bio_barrier;
2662         mdev->resync_wenr = LC_FREE;
2663 }
2664
2665 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2666 {
2667         if (mdev->receiver.t_state != None)
2668                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2669                                 mdev->receiver.t_state);
2670
2671         /* no need to lock it, I'm the only thread alive */
2672         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2673                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2674         mdev->al_writ_cnt  =
2675         mdev->bm_writ_cnt  =
2676         mdev->read_cnt     =
2677         mdev->recv_cnt     =
2678         mdev->send_cnt     =
2679         mdev->writ_cnt     =
2680         mdev->p_size       =
2681         mdev->rs_start     =
2682         mdev->rs_total     =
2683         mdev->rs_failed    =
2684         mdev->rs_mark_left =
2685         mdev->rs_mark_time = 0;
2686         D_ASSERT(mdev->net_conf == NULL);
2687
2688         drbd_set_my_capacity(mdev, 0);
2689         if (mdev->bitmap) {
2690                 /* maybe never allocated. */
2691                 drbd_bm_resize(mdev, 0);
2692                 drbd_bm_cleanup(mdev);
2693         }
2694
2695         drbd_free_resources(mdev);
2696
2697         /*
2698          * currently we drbd_init_ee only on module load, so
2699          * we may do drbd_release_ee only on module unload!
2700          */
2701         D_ASSERT(list_empty(&mdev->active_ee));
2702         D_ASSERT(list_empty(&mdev->sync_ee));
2703         D_ASSERT(list_empty(&mdev->done_ee));
2704         D_ASSERT(list_empty(&mdev->read_ee));
2705         D_ASSERT(list_empty(&mdev->net_ee));
2706         D_ASSERT(list_empty(&mdev->resync_reads));
2707         D_ASSERT(list_empty(&mdev->data.work.q));
2708         D_ASSERT(list_empty(&mdev->meta.work.q));
2709         D_ASSERT(list_empty(&mdev->resync_work.list));
2710         D_ASSERT(list_empty(&mdev->unplug_work.list));
2711
2712 }
2713
2714
2715 static void drbd_destroy_mempools(void)
2716 {
2717         struct page *page;
2718
2719         while (drbd_pp_pool) {
2720                 page = drbd_pp_pool;
2721                 drbd_pp_pool = (struct page *)page_private(page);
2722                 __free_page(page);
2723                 drbd_pp_vacant--;
2724         }
2725
2726         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2727
2728         if (drbd_ee_mempool)
2729                 mempool_destroy(drbd_ee_mempool);
2730         if (drbd_request_mempool)
2731                 mempool_destroy(drbd_request_mempool);
2732         if (drbd_ee_cache)
2733                 kmem_cache_destroy(drbd_ee_cache);
2734         if (drbd_request_cache)
2735                 kmem_cache_destroy(drbd_request_cache);
2736         if (drbd_bm_ext_cache)
2737                 kmem_cache_destroy(drbd_bm_ext_cache);
2738         if (drbd_al_ext_cache)
2739                 kmem_cache_destroy(drbd_al_ext_cache);
2740
2741         drbd_ee_mempool      = NULL;
2742         drbd_request_mempool = NULL;
2743         drbd_ee_cache        = NULL;
2744         drbd_request_cache   = NULL;
2745         drbd_bm_ext_cache    = NULL;
2746         drbd_al_ext_cache    = NULL;
2747
2748         return;
2749 }
2750
2751 static int drbd_create_mempools(void)
2752 {
2753         struct page *page;
2754         const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2755         int i;
2756
2757         /* prepare our caches and mempools */
2758         drbd_request_mempool = NULL;
2759         drbd_ee_cache        = NULL;
2760         drbd_request_cache   = NULL;
2761         drbd_bm_ext_cache    = NULL;
2762         drbd_al_ext_cache    = NULL;
2763         drbd_pp_pool         = NULL;
2764
2765         /* caches */
2766         drbd_request_cache = kmem_cache_create(
2767                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2768         if (drbd_request_cache == NULL)
2769                 goto Enomem;
2770
2771         drbd_ee_cache = kmem_cache_create(
2772                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2773         if (drbd_ee_cache == NULL)
2774                 goto Enomem;
2775
2776         drbd_bm_ext_cache = kmem_cache_create(
2777                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2778         if (drbd_bm_ext_cache == NULL)
2779                 goto Enomem;
2780
2781         drbd_al_ext_cache = kmem_cache_create(
2782                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2783         if (drbd_al_ext_cache == NULL)
2784                 goto Enomem;
2785
2786         /* mempools */
2787         drbd_request_mempool = mempool_create(number,
2788                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2789         if (drbd_request_mempool == NULL)
2790                 goto Enomem;
2791
2792         drbd_ee_mempool = mempool_create(number,
2793                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2794         if (drbd_request_mempool == NULL)
2795                 goto Enomem;
2796
2797         /* drbd's page pool */
2798         spin_lock_init(&drbd_pp_lock);
2799
2800         for (i = 0; i < number; i++) {
2801                 page = alloc_page(GFP_HIGHUSER);
2802                 if (!page)
2803                         goto Enomem;
2804                 set_page_private(page, (unsigned long)drbd_pp_pool);
2805                 drbd_pp_pool = page;
2806         }
2807         drbd_pp_vacant = number;
2808
2809         return 0;
2810
2811 Enomem:
2812         drbd_destroy_mempools(); /* in case we allocated some */
2813         return -ENOMEM;
2814 }
2815
2816 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2817         void *unused)
2818 {
2819         /* just so we have it.  you never know what interesting things we
2820          * might want to do here some day...
2821          */
2822
2823         return NOTIFY_DONE;
2824 }
2825
2826 static struct notifier_block drbd_notifier = {
2827         .notifier_call = drbd_notify_sys,
2828 };
2829
2830 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2831 {
2832         int rr;
2833
2834         rr = drbd_release_ee(mdev, &mdev->active_ee);
2835         if (rr)
2836                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2837
2838         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2839         if (rr)
2840                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2841
2842         rr = drbd_release_ee(mdev, &mdev->read_ee);
2843         if (rr)
2844                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2845
2846         rr = drbd_release_ee(mdev, &mdev->done_ee);
2847         if (rr)
2848                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2849
2850         rr = drbd_release_ee(mdev, &mdev->net_ee);
2851         if (rr)
2852                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2853 }
2854
2855 /* caution. no locking.
2856  * currently only used from module cleanup code. */
2857 static void drbd_delete_device(unsigned int minor)
2858 {
2859         struct drbd_conf *mdev = minor_to_mdev(minor);
2860
2861         if (!mdev)
2862                 return;
2863
2864         /* paranoia asserts */
2865         if (mdev->open_cnt != 0)
2866                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2867                                 __FILE__ , __LINE__);
2868
2869         ERR_IF (!list_empty(&mdev->data.work.q)) {
2870                 struct list_head *lp;
2871                 list_for_each(lp, &mdev->data.work.q) {
2872                         dev_err(DEV, "lp = %p\n", lp);
2873                 }
2874         };
2875         /* end paranoia asserts */
2876
2877         del_gendisk(mdev->vdisk);
2878
2879         /* cleanup stuff that may have been allocated during
2880          * device (re-)configuration or state changes */
2881
2882         if (mdev->this_bdev)
2883                 bdput(mdev->this_bdev);
2884
2885         drbd_free_resources(mdev);
2886
2887         drbd_release_ee_lists(mdev);
2888
2889         /* should be free'd on disconnect? */
2890         kfree(mdev->ee_hash);
2891         /*
2892         mdev->ee_hash_s = 0;
2893         mdev->ee_hash = NULL;
2894         */
2895
2896         lc_destroy(mdev->act_log);
2897         lc_destroy(mdev->resync);
2898
2899         kfree(mdev->p_uuid);
2900         /* mdev->p_uuid = NULL; */
2901
2902         kfree(mdev->int_dig_out);
2903         kfree(mdev->int_dig_in);
2904         kfree(mdev->int_dig_vv);
2905
2906         /* cleanup the rest that has been
2907          * allocated from drbd_new_device
2908          * and actually free the mdev itself */
2909         drbd_free_mdev(mdev);
2910 }
2911
2912 static void drbd_cleanup(void)
2913 {
2914         unsigned int i;
2915
2916         unregister_reboot_notifier(&drbd_notifier);
2917
2918         drbd_nl_cleanup();
2919
2920         if (minor_table) {
2921                 if (drbd_proc)
2922                         remove_proc_entry("drbd", NULL);
2923                 i = minor_count;
2924                 while (i--)
2925                         drbd_delete_device(i);
2926                 drbd_destroy_mempools();
2927         }
2928
2929         kfree(minor_table);
2930
2931         unregister_blkdev(DRBD_MAJOR, "drbd");
2932
2933         printk(KERN_INFO "drbd: module cleanup done.\n");
2934 }
2935
2936 /**
2937  * drbd_congested() - Callback for pdflush
2938  * @congested_data:     User data
2939  * @bdi_bits:           Bits pdflush is currently interested in
2940  *
2941  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2942  */
2943 static int drbd_congested(void *congested_data, int bdi_bits)
2944 {
2945         struct drbd_conf *mdev = congested_data;
2946         struct request_queue *q;
2947         char reason = '-';
2948         int r = 0;
2949
2950         if (!__inc_ap_bio_cond(mdev)) {
2951                 /* DRBD has frozen IO */
2952                 r = bdi_bits;
2953                 reason = 'd';
2954                 goto out;
2955         }
2956
2957         if (get_ldev(mdev)) {
2958                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2959                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2960                 put_ldev(mdev);
2961                 if (r)
2962                         reason = 'b';
2963         }
2964
2965         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
2966                 r |= (1 << BDI_async_congested);
2967                 reason = reason == 'b' ? 'a' : 'n';
2968         }
2969
2970 out:
2971         mdev->congestion_reason = reason;
2972         return r;
2973 }
2974
2975 struct drbd_conf *drbd_new_device(unsigned int minor)
2976 {
2977         struct drbd_conf *mdev;
2978         struct gendisk *disk;
2979         struct request_queue *q;
2980
2981         /* GFP_KERNEL, we are outside of all write-out paths */
2982         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2983         if (!mdev)
2984                 return NULL;
2985         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
2986                 goto out_no_cpumask;
2987
2988         mdev->minor = minor;
2989
2990         drbd_init_set_defaults(mdev);
2991
2992         q = blk_alloc_queue(GFP_KERNEL);
2993         if (!q)
2994                 goto out_no_q;
2995         mdev->rq_queue = q;
2996         q->queuedata   = mdev;
2997         blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
2998
2999         disk = alloc_disk(1);
3000         if (!disk)
3001                 goto out_no_disk;
3002         mdev->vdisk = disk;
3003
3004         set_disk_ro(disk, TRUE);
3005
3006         disk->queue = q;
3007         disk->major = DRBD_MAJOR;
3008         disk->first_minor = minor;
3009         disk->fops = &drbd_ops;
3010         sprintf(disk->disk_name, "drbd%d", minor);
3011         disk->private_data = mdev;
3012
3013         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3014         /* we have no partitions. we contain only ourselves. */
3015         mdev->this_bdev->bd_contains = mdev->this_bdev;
3016
3017         q->backing_dev_info.congested_fn = drbd_congested;
3018         q->backing_dev_info.congested_data = mdev;
3019
3020         blk_queue_make_request(q, drbd_make_request_26);
3021         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3022         blk_queue_merge_bvec(q, drbd_merge_bvec);
3023         q->queue_lock = &mdev->req_lock; /* needed since we use */
3024                 /* plugging on a queue, that actually has no requests! */
3025         q->unplug_fn = drbd_unplug_fn;
3026
3027         mdev->md_io_page = alloc_page(GFP_KERNEL);
3028         if (!mdev->md_io_page)
3029                 goto out_no_io_page;
3030
3031         if (drbd_bm_init(mdev))
3032                 goto out_no_bitmap;
3033         /* no need to lock access, we are still initializing this minor device. */
3034         if (!tl_init(mdev))
3035                 goto out_no_tl;
3036
3037         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3038         if (!mdev->app_reads_hash)
3039                 goto out_no_app_reads;
3040
3041         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3042         if (!mdev->current_epoch)
3043                 goto out_no_epoch;
3044
3045         INIT_LIST_HEAD(&mdev->current_epoch->list);
3046         mdev->epochs = 1;
3047
3048         return mdev;
3049
3050 /* out_whatever_else:
3051         kfree(mdev->current_epoch); */
3052 out_no_epoch:
3053         kfree(mdev->app_reads_hash);
3054 out_no_app_reads:
3055         tl_cleanup(mdev);
3056 out_no_tl:
3057         drbd_bm_cleanup(mdev);
3058 out_no_bitmap:
3059         __free_page(mdev->md_io_page);
3060 out_no_io_page:
3061         put_disk(disk);
3062 out_no_disk:
3063         blk_cleanup_queue(q);
3064 out_no_q:
3065         free_cpumask_var(mdev->cpu_mask);
3066 out_no_cpumask:
3067         kfree(mdev);
3068         return NULL;
3069 }
3070
3071 /* counterpart of drbd_new_device.
3072  * last part of drbd_delete_device. */
3073 void drbd_free_mdev(struct drbd_conf *mdev)
3074 {
3075         kfree(mdev->current_epoch);
3076         kfree(mdev->app_reads_hash);
3077         tl_cleanup(mdev);
3078         if (mdev->bitmap) /* should no longer be there. */
3079                 drbd_bm_cleanup(mdev);
3080         __free_page(mdev->md_io_page);
3081         put_disk(mdev->vdisk);
3082         blk_cleanup_queue(mdev->rq_queue);
3083         free_cpumask_var(mdev->cpu_mask);
3084         kfree(mdev);
3085 }
3086
3087
3088 int __init drbd_init(void)
3089 {
3090         int err;
3091
3092         if (sizeof(struct p_handshake) != 80) {
3093                 printk(KERN_ERR
3094                        "drbd: never change the size or layout "
3095                        "of the HandShake packet.\n");
3096                 return -EINVAL;
3097         }
3098
3099         if (1 > minor_count || minor_count > 255) {
3100                 printk(KERN_ERR
3101                         "drbd: invalid minor_count (%d)\n", minor_count);
3102 #ifdef MODULE
3103                 return -EINVAL;
3104 #else
3105                 minor_count = 8;
3106 #endif
3107         }
3108
3109         err = drbd_nl_init();
3110         if (err)
3111                 return err;
3112
3113         err = register_blkdev(DRBD_MAJOR, "drbd");
3114         if (err) {
3115                 printk(KERN_ERR
3116                        "drbd: unable to register block device major %d\n",
3117                        DRBD_MAJOR);
3118                 return err;
3119         }
3120
3121         register_reboot_notifier(&drbd_notifier);
3122
3123         /*
3124          * allocate all necessary structs
3125          */
3126         err = -ENOMEM;
3127
3128         init_waitqueue_head(&drbd_pp_wait);
3129
3130         drbd_proc = NULL; /* play safe for drbd_cleanup */
3131         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3132                                 GFP_KERNEL);
3133         if (!minor_table)
3134                 goto Enomem;
3135
3136         err = drbd_create_mempools();
3137         if (err)
3138                 goto Enomem;
3139
3140         drbd_proc = proc_create("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops);
3141         if (!drbd_proc) {
3142                 printk(KERN_ERR "drbd: unable to register proc file\n");
3143                 goto Enomem;
3144         }
3145
3146         rwlock_init(&global_state_lock);
3147
3148         printk(KERN_INFO "drbd: initialized. "
3149                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3150                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3151         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3152         printk(KERN_INFO "drbd: registered as block device major %d\n",
3153                 DRBD_MAJOR);
3154         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3155
3156         return 0; /* Success! */
3157
3158 Enomem:
3159         drbd_cleanup();
3160         if (err == -ENOMEM)
3161                 /* currently always the case */
3162                 printk(KERN_ERR "drbd: ran out of memory\n");
3163         else
3164                 printk(KERN_ERR "drbd: initialization failure\n");
3165         return err;
3166 }
3167
3168 void drbd_free_bc(struct drbd_backing_dev *ldev)
3169 {
3170         if (ldev == NULL)
3171                 return;
3172
3173         bd_release(ldev->backing_bdev);
3174         bd_release(ldev->md_bdev);
3175
3176         fput(ldev->lo_file);
3177         fput(ldev->md_file);
3178
3179         kfree(ldev);
3180 }
3181
3182 void drbd_free_sock(struct drbd_conf *mdev)
3183 {
3184         if (mdev->data.socket) {
3185                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3186                 sock_release(mdev->data.socket);
3187                 mdev->data.socket = NULL;
3188         }
3189         if (mdev->meta.socket) {
3190                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3191                 sock_release(mdev->meta.socket);
3192                 mdev->meta.socket = NULL;
3193         }
3194 }
3195
3196
3197 void drbd_free_resources(struct drbd_conf *mdev)
3198 {
3199         crypto_free_hash(mdev->csums_tfm);
3200         mdev->csums_tfm = NULL;
3201         crypto_free_hash(mdev->verify_tfm);
3202         mdev->verify_tfm = NULL;
3203         crypto_free_hash(mdev->cram_hmac_tfm);
3204         mdev->cram_hmac_tfm = NULL;
3205         crypto_free_hash(mdev->integrity_w_tfm);
3206         mdev->integrity_w_tfm = NULL;
3207         crypto_free_hash(mdev->integrity_r_tfm);
3208         mdev->integrity_r_tfm = NULL;
3209
3210         drbd_free_sock(mdev);
3211
3212         __no_warn(local,
3213                   drbd_free_bc(mdev->ldev);
3214                   mdev->ldev = NULL;);
3215 }
3216
3217 /* meta data management */
3218
3219 struct meta_data_on_disk {
3220         u64 la_size;           /* last agreed size. */
3221         u64 uuid[UI_SIZE];   /* UUIDs. */
3222         u64 device_uuid;
3223         u64 reserved_u64_1;
3224         u32 flags;             /* MDF */
3225         u32 magic;
3226         u32 md_size_sect;
3227         u32 al_offset;         /* offset to this block */
3228         u32 al_nr_extents;     /* important for restoring the AL */
3229               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3230         u32 bm_offset;         /* offset to the bitmap, from here */
3231         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3232         u32 reserved_u32[4];
3233
3234 } __packed;
3235
3236 /**
3237  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3238  * @mdev:       DRBD device.
3239  */
3240 void drbd_md_sync(struct drbd_conf *mdev)
3241 {
3242         struct meta_data_on_disk *buffer;
3243         sector_t sector;
3244         int i;
3245
3246         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3247                 return;
3248         del_timer(&mdev->md_sync_timer);
3249
3250         /* We use here D_FAILED and not D_ATTACHING because we try to write
3251          * metadata even if we detach due to a disk failure! */
3252         if (!get_ldev_if_state(mdev, D_FAILED))
3253                 return;
3254
3255         trace_drbd_md_io(mdev, WRITE, mdev->ldev);
3256
3257         mutex_lock(&mdev->md_io_mutex);
3258         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3259         memset(buffer, 0, 512);
3260
3261         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3262         for (i = UI_CURRENT; i < UI_SIZE; i++)
3263                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3264         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3265         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3266
3267         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3268         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3269         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3270         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3271         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3272
3273         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3274
3275         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3276         sector = mdev->ldev->md.md_offset;
3277
3278         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3279                 clear_bit(MD_DIRTY, &mdev->flags);
3280         } else {
3281                 /* this was a try anyways ... */
3282                 dev_err(DEV, "meta data update failed!\n");
3283
3284                 drbd_chk_io_error(mdev, 1, TRUE);
3285         }
3286
3287         /* Update mdev->ldev->md.la_size_sect,
3288          * since we updated it on metadata. */
3289         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3290
3291         mutex_unlock(&mdev->md_io_mutex);
3292         put_ldev(mdev);
3293 }
3294
3295 /**
3296  * drbd_md_read() - Reads in the meta data super block
3297  * @mdev:       DRBD device.
3298  * @bdev:       Device from which the meta data should be read in.
3299  *
3300  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3301  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3302  */
3303 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3304 {
3305         struct meta_data_on_disk *buffer;
3306         int i, rv = NO_ERROR;
3307
3308         if (!get_ldev_if_state(mdev, D_ATTACHING))
3309                 return ERR_IO_MD_DISK;
3310
3311         trace_drbd_md_io(mdev, READ, bdev);
3312
3313         mutex_lock(&mdev->md_io_mutex);
3314         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3315
3316         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3317                 /* NOTE: cant do normal error processing here as this is
3318                    called BEFORE disk is attached */
3319                 dev_err(DEV, "Error while reading metadata.\n");
3320                 rv = ERR_IO_MD_DISK;
3321                 goto err;
3322         }
3323
3324         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3325                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3326                 rv = ERR_MD_INVALID;
3327                 goto err;
3328         }
3329         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3330                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3331                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3332                 rv = ERR_MD_INVALID;
3333                 goto err;
3334         }
3335         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3336                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3337                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3338                 rv = ERR_MD_INVALID;
3339                 goto err;
3340         }
3341         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3342                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3343                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3344                 rv = ERR_MD_INVALID;
3345                 goto err;
3346         }
3347
3348         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3349                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3350                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3351                 rv = ERR_MD_INVALID;
3352                 goto err;
3353         }
3354
3355         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3356         for (i = UI_CURRENT; i < UI_SIZE; i++)
3357                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3358         bdev->md.flags = be32_to_cpu(buffer->flags);
3359         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3360         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3361
3362         if (mdev->sync_conf.al_extents < 7)
3363                 mdev->sync_conf.al_extents = 127;
3364
3365  err:
3366         mutex_unlock(&mdev->md_io_mutex);
3367         put_ldev(mdev);
3368
3369         return rv;
3370 }
3371
3372 /**
3373  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3374  * @mdev:       DRBD device.
3375  *
3376  * Call this function if you change anything that should be written to
3377  * the meta-data super block. This function sets MD_DIRTY, and starts a
3378  * timer that ensures that within five seconds you have to call drbd_md_sync().
3379  */
3380 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3381 {
3382         set_bit(MD_DIRTY, &mdev->flags);
3383         mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3384 }
3385
3386
3387 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3388 {
3389         int i;
3390
3391         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++) {
3392                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3393
3394                 trace_drbd_uuid(mdev, i+1);
3395         }
3396 }
3397
3398 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3399 {
3400         if (idx == UI_CURRENT) {
3401                 if (mdev->state.role == R_PRIMARY)
3402                         val |= 1;
3403                 else
3404                         val &= ~((u64)1);
3405
3406                 drbd_set_ed_uuid(mdev, val);
3407         }
3408
3409         mdev->ldev->md.uuid[idx] = val;
3410         trace_drbd_uuid(mdev, idx);
3411         drbd_md_mark_dirty(mdev);
3412 }
3413
3414
3415 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3416 {
3417         if (mdev->ldev->md.uuid[idx]) {
3418                 drbd_uuid_move_history(mdev);
3419                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3420                 trace_drbd_uuid(mdev, UI_HISTORY_START);
3421         }
3422         _drbd_uuid_set(mdev, idx, val);
3423 }
3424
3425 /**
3426  * drbd_uuid_new_current() - Creates a new current UUID
3427  * @mdev:       DRBD device.
3428  *
3429  * Creates a new current UUID, and rotates the old current UUID into
3430  * the bitmap slot. Causes an incremental resync upon next connect.
3431  */
3432 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3433 {
3434         u64 val;
3435
3436         dev_info(DEV, "Creating new current UUID\n");
3437         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3438         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3439         trace_drbd_uuid(mdev, UI_BITMAP);
3440
3441         get_random_bytes(&val, sizeof(u64));
3442         _drbd_uuid_set(mdev, UI_CURRENT, val);
3443 }
3444
3445 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3446 {
3447         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3448                 return;
3449
3450         if (val == 0) {
3451                 drbd_uuid_move_history(mdev);
3452                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3453                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3454                 trace_drbd_uuid(mdev, UI_HISTORY_START);
3455                 trace_drbd_uuid(mdev, UI_BITMAP);
3456         } else {
3457                 if (mdev->ldev->md.uuid[UI_BITMAP])
3458                         dev_warn(DEV, "bm UUID already set");
3459
3460                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3461                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3462
3463                 trace_drbd_uuid(mdev, UI_BITMAP);
3464         }
3465         drbd_md_mark_dirty(mdev);
3466 }
3467
3468 /**
3469  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3470  * @mdev:       DRBD device.
3471  *
3472  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3473  */
3474 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3475 {
3476         int rv = -EIO;
3477
3478         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3479                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3480                 drbd_md_sync(mdev);
3481                 drbd_bm_set_all(mdev);
3482
3483                 rv = drbd_bm_write(mdev);
3484
3485                 if (!rv) {
3486                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3487                         drbd_md_sync(mdev);
3488                 }
3489
3490                 put_ldev(mdev);
3491         }
3492
3493         return rv;
3494 }
3495
3496 /**
3497  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3498  * @mdev:       DRBD device.
3499  *
3500  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3501  */
3502 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3503 {
3504         int rv = -EIO;
3505
3506         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3507                 drbd_bm_clear_all(mdev);
3508                 rv = drbd_bm_write(mdev);
3509                 put_ldev(mdev);
3510         }
3511
3512         return rv;
3513 }
3514
3515 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3516 {
3517         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3518         int rv;
3519
3520         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3521
3522         drbd_bm_lock(mdev, work->why);
3523         rv = work->io_fn(mdev);
3524         drbd_bm_unlock(mdev);
3525
3526         clear_bit(BITMAP_IO, &mdev->flags);
3527         wake_up(&mdev->misc_wait);
3528
3529         if (work->done)
3530                 work->done(mdev, rv);
3531
3532         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3533         work->why = NULL;
3534
3535         return 1;
3536 }
3537
3538 /**
3539  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3540  * @mdev:       DRBD device.
3541  * @io_fn:      IO callback to be called when bitmap IO is possible
3542  * @done:       callback to be called after the bitmap IO was performed
3543  * @why:        Descriptive text of the reason for doing the IO
3544  *
3545  * While IO on the bitmap happens we freeze application IO thus we ensure
3546  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3547  * called from worker context. It MUST NOT be used while a previous such
3548  * work is still pending!
3549  */
3550 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3551                           int (*io_fn)(struct drbd_conf *),
3552                           void (*done)(struct drbd_conf *, int),
3553                           char *why)
3554 {
3555         D_ASSERT(current == mdev->worker.task);
3556
3557         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3558         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3559         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3560         if (mdev->bm_io_work.why)
3561                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3562                         why, mdev->bm_io_work.why);
3563
3564         mdev->bm_io_work.io_fn = io_fn;
3565         mdev->bm_io_work.done = done;
3566         mdev->bm_io_work.why = why;
3567
3568         set_bit(BITMAP_IO, &mdev->flags);
3569         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3570                 if (list_empty(&mdev->bm_io_work.w.list)) {
3571                         set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3572                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3573                 } else
3574                         dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3575         }
3576 }
3577
3578 /**
3579  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3580  * @mdev:       DRBD device.
3581  * @io_fn:      IO callback to be called when bitmap IO is possible
3582  * @why:        Descriptive text of the reason for doing the IO
3583  *
3584  * freezes application IO while that the actual IO operations runs. This
3585  * functions MAY NOT be called from worker context.
3586  */
3587 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3588 {
3589         int rv;
3590
3591         D_ASSERT(current != mdev->worker.task);
3592
3593         drbd_suspend_io(mdev);
3594
3595         drbd_bm_lock(mdev, why);
3596         rv = io_fn(mdev);
3597         drbd_bm_unlock(mdev);
3598
3599         drbd_resume_io(mdev);
3600
3601         return rv;
3602 }
3603
3604 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3605 {
3606         if ((mdev->ldev->md.flags & flag) != flag) {
3607                 drbd_md_mark_dirty(mdev);
3608                 mdev->ldev->md.flags |= flag;
3609         }
3610 }
3611
3612 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3613 {
3614         if ((mdev->ldev->md.flags & flag) != 0) {
3615                 drbd_md_mark_dirty(mdev);
3616                 mdev->ldev->md.flags &= ~flag;
3617         }
3618 }
3619 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3620 {
3621         return (bdev->md.flags & flag) != 0;
3622 }
3623
3624 static void md_sync_timer_fn(unsigned long data)
3625 {
3626         struct drbd_conf *mdev = (struct drbd_conf *) data;
3627
3628         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3629 }
3630
3631 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3632 {
3633         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3634         drbd_md_sync(mdev);
3635
3636         return 1;
3637 }
3638
3639 #ifdef CONFIG_DRBD_FAULT_INJECTION
3640 /* Fault insertion support including random number generator shamelessly
3641  * stolen from kernel/rcutorture.c */
3642 struct fault_random_state {
3643         unsigned long state;
3644         unsigned long count;
3645 };
3646
3647 #define FAULT_RANDOM_MULT 39916801  /* prime */
3648 #define FAULT_RANDOM_ADD        479001701 /* prime */
3649 #define FAULT_RANDOM_REFRESH 10000
3650
3651 /*
3652  * Crude but fast random-number generator.  Uses a linear congruential
3653  * generator, with occasional help from get_random_bytes().
3654  */
3655 static unsigned long
3656 _drbd_fault_random(struct fault_random_state *rsp)
3657 {
3658         long refresh;
3659
3660         if (--rsp->count < 0) {
3661                 get_random_bytes(&refresh, sizeof(refresh));
3662                 rsp->state += refresh;
3663                 rsp->count = FAULT_RANDOM_REFRESH;
3664         }
3665         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3666         return swahw32(rsp->state);
3667 }
3668
3669 static char *
3670 _drbd_fault_str(unsigned int type) {
3671         static char *_faults[] = {
3672                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3673                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3674                 [DRBD_FAULT_RS_WR] = "Resync write",
3675                 [DRBD_FAULT_RS_RD] = "Resync read",
3676                 [DRBD_FAULT_DT_WR] = "Data write",
3677                 [DRBD_FAULT_DT_RD] = "Data read",
3678                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3679                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3680                 [DRBD_FAULT_AL_EE] = "EE allocation"
3681         };
3682
3683         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3684 }
3685
3686 unsigned int
3687 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3688 {
3689         static struct fault_random_state rrs = {0, 0};
3690
3691         unsigned int ret = (
3692                 (fault_devs == 0 ||
3693                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3694                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3695
3696         if (ret) {
3697                 fault_count++;
3698
3699                 if (printk_ratelimit())
3700                         dev_warn(DEV, "***Simulating %s failure\n",
3701                                 _drbd_fault_str(type));
3702         }
3703
3704         return ret;
3705 }
3706 #endif
3707
3708 const char *drbd_buildtag(void)
3709 {
3710         /* DRBD built from external sources has here a reference to the
3711            git hash of the source code. */
3712
3713         static char buildtag[38] = "\0uilt-in";
3714
3715         if (buildtag[0] == 0) {
3716 #ifdef CONFIG_MODULES
3717                 if (THIS_MODULE != NULL)
3718                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3719                 else
3720 #endif
3721                         buildtag[0] = 'b';
3722         }
3723
3724         return buildtag;
3725 }
3726
3727 module_init(drbd_init)
3728 module_exit(drbd_cleanup)
3729
3730 /* For drbd_tracing: */
3731 EXPORT_SYMBOL(drbd_conn_str);
3732 EXPORT_SYMBOL(drbd_role_str);
3733 EXPORT_SYMBOL(drbd_disk_str);
3734 EXPORT_SYMBOL(drbd_set_st_err_str);