]> bbs.cooldavid.org Git - net-next-2.6.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Fix: Do not detach, if a bio with a barrier fails
[net-next-2.6.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
b411b363
PR
49#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
45bb912b
LE
83/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
159{
160 struct page *page = NULL;
45bb912b
LE
161 struct page *tmp = NULL;
162 int i = 0;
b411b363
PR
163
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
45bb912b 166 if (drbd_pp_vacant >= number) {
b411b363 167 spin_lock(&drbd_pp_lock);
45bb912b
LE
168 page = page_chain_del(&drbd_pp_pool, number);
169 if (page)
170 drbd_pp_vacant -= number;
b411b363 171 spin_unlock(&drbd_pp_lock);
45bb912b
LE
172 if (page)
173 return page;
b411b363 174 }
45bb912b 175
b411b363
PR
176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
45bb912b
LE
179 for (i = 0; i < number; i++) {
180 tmp = alloc_page(GFP_TRY);
181 if (!tmp)
182 break;
183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
b411b363
PR
201}
202
203/* kick lower level device, if we have more than (arbitrary number)
204 * reference counts on it, which typically are locally submitted io
205 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
206static void maybe_kick_lo(struct drbd_conf *mdev)
207{
208 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
209 drbd_kick_lo(mdev);
210}
211
212static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
213{
214 struct drbd_epoch_entry *e;
215 struct list_head *le, *tle;
216
217 /* The EEs are always appended to the end of the list. Since
218 they are sent in order over the wire, they have to finish
219 in order. As soon as we see the first not finished we can
220 stop to examine the list... */
221
222 list_for_each_safe(le, tle, &mdev->net_ee) {
223 e = list_entry(le, struct drbd_epoch_entry, w.list);
45bb912b 224 if (drbd_ee_has_active_page(e))
b411b363
PR
225 break;
226 list_move(le, to_be_freed);
227 }
228}
229
230static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
231{
232 LIST_HEAD(reclaimed);
233 struct drbd_epoch_entry *e, *t;
234
235 maybe_kick_lo(mdev);
236 spin_lock_irq(&mdev->req_lock);
237 reclaim_net_ee(mdev, &reclaimed);
238 spin_unlock_irq(&mdev->req_lock);
239
240 list_for_each_entry_safe(e, t, &reclaimed, w.list)
241 drbd_free_ee(mdev, e);
242}
243
244/**
45bb912b 245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 246 * @mdev: DRBD device.
45bb912b
LE
247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
249 *
250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 253 *
45bb912b 254 * Returns a page chain linked via page->private.
b411b363 255 */
45bb912b 256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
257{
258 struct page *page = NULL;
259 DEFINE_WAIT(wait);
260
45bb912b
LE
261 /* Yes, we may run up to @number over max_buffers. If we
262 * follow it strictly, the admin will get it wrong anyways. */
263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 265
45bb912b 266 while (page == NULL) {
b411b363
PR
267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
268
269 drbd_kick_lo_and_reclaim_net(mdev);
270
271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
45bb912b 272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
273 if (page)
274 break;
275 }
276
277 if (!retry)
278 break;
279
280 if (signal_pending(current)) {
281 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
282 break;
283 }
284
285 schedule();
286 }
287 finish_wait(&drbd_pp_wait, &wait);
288
45bb912b
LE
289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
291 return page;
292}
293
294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
45bb912b
LE
295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
b411b363
PR
298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
299{
b411b363 300 int i;
45bb912b
LE
301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
302 i = page_chain_free(page);
303 else {
304 struct page *tmp;
305 tmp = page_chain_tail(page, &i);
306 spin_lock(&drbd_pp_lock);
307 page_chain_add(&drbd_pp_pool, page, tmp);
308 drbd_pp_vacant += i;
309 spin_unlock(&drbd_pp_lock);
b411b363 310 }
45bb912b
LE
311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
b411b363
PR
315 wake_up(&drbd_pp_wait);
316}
317
318/*
319You need to hold the req_lock:
320 _drbd_wait_ee_list_empty()
321
322You must not have the req_lock:
323 drbd_free_ee()
324 drbd_alloc_ee()
325 drbd_init_ee()
326 drbd_release_ee()
327 drbd_ee_fix_bhs()
328 drbd_process_done_ee()
329 drbd_clear_done_ee()
330 drbd_wait_ee_list_empty()
331*/
332
333struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
334 u64 id,
335 sector_t sector,
336 unsigned int data_size,
337 gfp_t gfp_mask) __must_hold(local)
338{
b411b363
PR
339 struct drbd_epoch_entry *e;
340 struct page *page;
45bb912b 341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363
PR
342
343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
344 return NULL;
345
346 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
347 if (!e) {
348 if (!(gfp_mask & __GFP_NOWARN))
349 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
350 return NULL;
351 }
352
45bb912b
LE
353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
354 if (!page)
355 goto fail;
b411b363 356
b411b363
PR
357 INIT_HLIST_NODE(&e->colision);
358 e->epoch = NULL;
45bb912b
LE
359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
b411b363 363 e->flags = 0;
45bb912b
LE
364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
b411b363 367
b411b363
PR
368 return e;
369
45bb912b 370 fail:
b411b363 371 mempool_free(e, drbd_ee_mempool);
b411b363
PR
372 return NULL;
373}
374
375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
376{
45bb912b
LE
377 drbd_pp_free(mdev, e->pages);
378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
b411b363
PR
379 D_ASSERT(hlist_unhashed(&e->colision));
380 mempool_free(e, drbd_ee_mempool);
381}
382
383int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
384{
385 LIST_HEAD(work_list);
386 struct drbd_epoch_entry *e, *t;
387 int count = 0;
388
389 spin_lock_irq(&mdev->req_lock);
390 list_splice_init(list, &work_list);
391 spin_unlock_irq(&mdev->req_lock);
392
393 list_for_each_entry_safe(e, t, &work_list, w.list) {
394 drbd_free_ee(mdev, e);
395 count++;
396 }
397 return count;
398}
399
400
401/*
402 * This function is called from _asender only_
403 * but see also comments in _req_mod(,barrier_acked)
404 * and receive_Barrier.
405 *
406 * Move entries from net_ee to done_ee, if ready.
407 * Grab done_ee, call all callbacks, free the entries.
408 * The callbacks typically send out ACKs.
409 */
410static int drbd_process_done_ee(struct drbd_conf *mdev)
411{
412 LIST_HEAD(work_list);
413 LIST_HEAD(reclaimed);
414 struct drbd_epoch_entry *e, *t;
415 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
416
417 spin_lock_irq(&mdev->req_lock);
418 reclaim_net_ee(mdev, &reclaimed);
419 list_splice_init(&mdev->done_ee, &work_list);
420 spin_unlock_irq(&mdev->req_lock);
421
422 list_for_each_entry_safe(e, t, &reclaimed, w.list)
423 drbd_free_ee(mdev, e);
424
425 /* possible callbacks here:
426 * e_end_block, and e_end_resync_block, e_send_discard_ack.
427 * all ignore the last argument.
428 */
429 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
430 /* list_del not necessary, next/prev members not touched */
431 ok = e->w.cb(mdev, &e->w, !ok) && ok;
432 drbd_free_ee(mdev, e);
433 }
434 wake_up(&mdev->ee_wait);
435
436 return ok;
437}
438
439void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440{
441 DEFINE_WAIT(wait);
442
443 /* avoids spin_lock/unlock
444 * and calling prepare_to_wait in the fast path */
445 while (!list_empty(head)) {
446 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
447 spin_unlock_irq(&mdev->req_lock);
448 drbd_kick_lo(mdev);
449 schedule();
450 finish_wait(&mdev->ee_wait, &wait);
451 spin_lock_irq(&mdev->req_lock);
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
457 spin_lock_irq(&mdev->req_lock);
458 _drbd_wait_ee_list_empty(mdev, head);
459 spin_unlock_irq(&mdev->req_lock);
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
464static int drbd_accept(struct drbd_conf *mdev, const char **what,
465 struct socket *sock, struct socket **newsock)
466{
467 struct sock *sk = sock->sk;
468 int err = 0;
469
470 *what = "listen";
471 err = sock->ops->listen(sock, 5);
472 if (err < 0)
473 goto out;
474
475 *what = "sock_create_lite";
476 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
477 newsock);
478 if (err < 0)
479 goto out;
480
481 *what = "accept";
482 err = sock->ops->accept(sock, *newsock, 0);
483 if (err < 0) {
484 sock_release(*newsock);
485 *newsock = NULL;
486 goto out;
487 }
488 (*newsock)->ops = sock->ops;
489
490out:
491 return err;
492}
493
494static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
495 void *buf, size_t size, int flags)
496{
497 mm_segment_t oldfs;
498 struct kvec iov = {
499 .iov_base = buf,
500 .iov_len = size,
501 };
502 struct msghdr msg = {
503 .msg_iovlen = 1,
504 .msg_iov = (struct iovec *)&iov,
505 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506 };
507 int rv;
508
509 oldfs = get_fs();
510 set_fs(KERNEL_DS);
511 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512 set_fs(oldfs);
513
514 return rv;
515}
516
517static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
518{
519 mm_segment_t oldfs;
520 struct kvec iov = {
521 .iov_base = buf,
522 .iov_len = size,
523 };
524 struct msghdr msg = {
525 .msg_iovlen = 1,
526 .msg_iov = (struct iovec *)&iov,
527 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 };
529 int rv;
530
531 oldfs = get_fs();
532 set_fs(KERNEL_DS);
533
534 for (;;) {
535 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
536 if (rv == size)
537 break;
538
539 /* Note:
540 * ECONNRESET other side closed the connection
541 * ERESTARTSYS (on sock) we got a signal
542 */
543
544 if (rv < 0) {
545 if (rv == -ECONNRESET)
546 dev_info(DEV, "sock was reset by peer\n");
547 else if (rv != -ERESTARTSYS)
548 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
549 break;
550 } else if (rv == 0) {
551 dev_info(DEV, "sock was shut down by peer\n");
552 break;
553 } else {
554 /* signal came in, or peer/link went down,
555 * after we read a partial message
556 */
557 /* D_ASSERT(signal_pending(current)); */
558 break;
559 }
560 };
561
562 set_fs(oldfs);
563
564 if (rv != size)
565 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
566
567 return rv;
568}
569
570static struct socket *drbd_try_connect(struct drbd_conf *mdev)
571{
572 const char *what;
573 struct socket *sock;
574 struct sockaddr_in6 src_in6;
575 int err;
576 int disconnect_on_error = 1;
577
578 if (!get_net_conf(mdev))
579 return NULL;
580
581 what = "sock_create_kern";
582 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
583 SOCK_STREAM, IPPROTO_TCP, &sock);
584 if (err < 0) {
585 sock = NULL;
586 goto out;
587 }
588
589 sock->sk->sk_rcvtimeo =
590 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
591
592 /* explicitly bind to the configured IP as source IP
593 * for the outgoing connections.
594 * This is needed for multihomed hosts and to be
595 * able to use lo: interfaces for drbd.
596 * Make sure to use 0 as port number, so linux selects
597 * a free one dynamically.
598 */
599 memcpy(&src_in6, mdev->net_conf->my_addr,
600 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
601 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
602 src_in6.sin6_port = 0;
603 else
604 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
605
606 what = "bind before connect";
607 err = sock->ops->bind(sock,
608 (struct sockaddr *) &src_in6,
609 mdev->net_conf->my_addr_len);
610 if (err < 0)
611 goto out;
612
613 /* connect may fail, peer not yet available.
614 * stay C_WF_CONNECTION, don't go Disconnecting! */
615 disconnect_on_error = 0;
616 what = "connect";
617 err = sock->ops->connect(sock,
618 (struct sockaddr *)mdev->net_conf->peer_addr,
619 mdev->net_conf->peer_addr_len, 0);
620
621out:
622 if (err < 0) {
623 if (sock) {
624 sock_release(sock);
625 sock = NULL;
626 }
627 switch (-err) {
628 /* timeout, busy, signal pending */
629 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
630 case EINTR: case ERESTARTSYS:
631 /* peer not (yet) available, network problem */
632 case ECONNREFUSED: case ENETUNREACH:
633 case EHOSTDOWN: case EHOSTUNREACH:
634 disconnect_on_error = 0;
635 break;
636 default:
637 dev_err(DEV, "%s failed, err = %d\n", what, err);
638 }
639 if (disconnect_on_error)
640 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
641 }
642 put_net_conf(mdev);
643 return sock;
644}
645
646static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
647{
648 int timeo, err;
649 struct socket *s_estab = NULL, *s_listen;
650 const char *what;
651
652 if (!get_net_conf(mdev))
653 return NULL;
654
655 what = "sock_create_kern";
656 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
657 SOCK_STREAM, IPPROTO_TCP, &s_listen);
658 if (err) {
659 s_listen = NULL;
660 goto out;
661 }
662
663 timeo = mdev->net_conf->try_connect_int * HZ;
664 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
665
666 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
667 s_listen->sk->sk_rcvtimeo = timeo;
668 s_listen->sk->sk_sndtimeo = timeo;
669
670 what = "bind before listen";
671 err = s_listen->ops->bind(s_listen,
672 (struct sockaddr *) mdev->net_conf->my_addr,
673 mdev->net_conf->my_addr_len);
674 if (err < 0)
675 goto out;
676
677 err = drbd_accept(mdev, &what, s_listen, &s_estab);
678
679out:
680 if (s_listen)
681 sock_release(s_listen);
682 if (err < 0) {
683 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
684 dev_err(DEV, "%s failed, err = %d\n", what, err);
685 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
686 }
687 }
688 put_net_conf(mdev);
689
690 return s_estab;
691}
692
693static int drbd_send_fp(struct drbd_conf *mdev,
694 struct socket *sock, enum drbd_packets cmd)
695{
696 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
697
698 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
699}
700
701static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
702{
703 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
704 int rr;
705
706 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
707
708 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
709 return be16_to_cpu(h->command);
710
711 return 0xffff;
712}
713
714/**
715 * drbd_socket_okay() - Free the socket if its connection is not okay
716 * @mdev: DRBD device.
717 * @sock: pointer to the pointer to the socket.
718 */
719static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
720{
721 int rr;
722 char tb[4];
723
724 if (!*sock)
725 return FALSE;
726
727 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
728
729 if (rr > 0 || rr == -EAGAIN) {
730 return TRUE;
731 } else {
732 sock_release(*sock);
733 *sock = NULL;
734 return FALSE;
735 }
736}
737
738/*
739 * return values:
740 * 1 yes, we have a valid connection
741 * 0 oops, did not work out, please try again
742 * -1 peer talks different language,
743 * no point in trying again, please go standalone.
744 * -2 We do not have a network config...
745 */
746static int drbd_connect(struct drbd_conf *mdev)
747{
748 struct socket *s, *sock, *msock;
749 int try, h, ok;
750
751 D_ASSERT(!mdev->data.socket);
752
753 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
754 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
755
756 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757 return -2;
758
759 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761 sock = NULL;
762 msock = NULL;
763
764 do {
765 for (try = 0;;) {
766 /* 3 tries, this should take less than a second! */
767 s = drbd_try_connect(mdev);
768 if (s || ++try >= 3)
769 break;
770 /* give the other side time to call bind() & listen() */
771 __set_current_state(TASK_INTERRUPTIBLE);
772 schedule_timeout(HZ / 10);
773 }
774
775 if (s) {
776 if (!sock) {
777 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
778 sock = s;
779 s = NULL;
780 } else if (!msock) {
781 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
782 msock = s;
783 s = NULL;
784 } else {
785 dev_err(DEV, "Logic error in drbd_connect()\n");
786 goto out_release_sockets;
787 }
788 }
789
790 if (sock && msock) {
791 __set_current_state(TASK_INTERRUPTIBLE);
792 schedule_timeout(HZ / 10);
793 ok = drbd_socket_okay(mdev, &sock);
794 ok = drbd_socket_okay(mdev, &msock) && ok;
795 if (ok)
796 break;
797 }
798
799retry:
800 s = drbd_wait_for_connect(mdev);
801 if (s) {
802 try = drbd_recv_fp(mdev, s);
803 drbd_socket_okay(mdev, &sock);
804 drbd_socket_okay(mdev, &msock);
805 switch (try) {
806 case P_HAND_SHAKE_S:
807 if (sock) {
808 dev_warn(DEV, "initial packet S crossed\n");
809 sock_release(sock);
810 }
811 sock = s;
812 break;
813 case P_HAND_SHAKE_M:
814 if (msock) {
815 dev_warn(DEV, "initial packet M crossed\n");
816 sock_release(msock);
817 }
818 msock = s;
819 set_bit(DISCARD_CONCURRENT, &mdev->flags);
820 break;
821 default:
822 dev_warn(DEV, "Error receiving initial packet\n");
823 sock_release(s);
824 if (random32() & 1)
825 goto retry;
826 }
827 }
828
829 if (mdev->state.conn <= C_DISCONNECTING)
830 goto out_release_sockets;
831 if (signal_pending(current)) {
832 flush_signals(current);
833 smp_rmb();
834 if (get_t_state(&mdev->receiver) == Exiting)
835 goto out_release_sockets;
836 }
837
838 if (sock && msock) {
839 ok = drbd_socket_okay(mdev, &sock);
840 ok = drbd_socket_okay(mdev, &msock) && ok;
841 if (ok)
842 break;
843 }
844 } while (1);
845
846 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
847 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
848
849 sock->sk->sk_allocation = GFP_NOIO;
850 msock->sk->sk_allocation = GFP_NOIO;
851
852 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
853 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
854
855 if (mdev->net_conf->sndbuf_size) {
856 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
857 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
858 }
859
860 if (mdev->net_conf->rcvbuf_size) {
861 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
862 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
863 }
864
865 /* NOT YET ...
866 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
867 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
868 * first set it to the P_HAND_SHAKE timeout,
869 * which we set to 4x the configured ping_timeout. */
870 sock->sk->sk_sndtimeo =
871 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
872
873 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
874 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
875
876 /* we don't want delays.
877 * we use TCP_CORK where apropriate, though */
878 drbd_tcp_nodelay(sock);
879 drbd_tcp_nodelay(msock);
880
881 mdev->data.socket = sock;
882 mdev->meta.socket = msock;
883 mdev->last_received = jiffies;
884
885 D_ASSERT(mdev->asender.task == NULL);
886
887 h = drbd_do_handshake(mdev);
888 if (h <= 0)
889 return h;
890
891 if (mdev->cram_hmac_tfm) {
892 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
893 switch (drbd_do_auth(mdev)) {
894 case -1:
b411b363
PR
895 dev_err(DEV, "Authentication of peer failed\n");
896 return -1;
b10d96cb
JT
897 case 0:
898 dev_err(DEV, "Authentication of peer failed, trying again.\n");
899 return 0;
b411b363
PR
900 }
901 }
902
903 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
904 return 0;
905
906 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
907 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
908
909 atomic_set(&mdev->packet_seq, 0);
910 mdev->peer_seq = 0;
911
912 drbd_thread_start(&mdev->asender);
913
7e2455c1
PR
914 if (!drbd_send_protocol(mdev))
915 return -1;
b411b363 916 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 917 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
918 drbd_send_uuids(mdev);
919 drbd_send_state(mdev);
920 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
921 clear_bit(RESIZE_PENDING, &mdev->flags);
922
923 return 1;
924
925out_release_sockets:
926 if (sock)
927 sock_release(sock);
928 if (msock)
929 sock_release(msock);
930 return -1;
931}
932
933static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
934{
935 int r;
936
937 r = drbd_recv(mdev, h, sizeof(*h));
938
939 if (unlikely(r != sizeof(*h))) {
940 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
941 return FALSE;
942 };
943 h->command = be16_to_cpu(h->command);
944 h->length = be16_to_cpu(h->length);
945 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
946 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
947 (long)be32_to_cpu(h->magic),
948 h->command, h->length);
949 return FALSE;
950 }
951 mdev->last_received = jiffies;
952
953 return TRUE;
954}
955
956static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
957{
958 int rv;
959
960 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a
DM
961 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
962 NULL, BLKDEV_IFL_WAIT);
b411b363
PR
963 if (rv) {
964 dev_err(DEV, "local disk flush failed with status %d\n", rv);
965 /* would rather check on EOPNOTSUPP, but that is not reliable.
966 * don't try again for ANY return value != 0
967 * if (rv == -EOPNOTSUPP) */
968 drbd_bump_write_ordering(mdev, WO_drain_io);
969 }
970 put_ldev(mdev);
971 }
972
973 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
974}
975
976static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
977{
978 struct flush_work *fw = (struct flush_work *)w;
979 struct drbd_epoch *epoch = fw->epoch;
980
981 kfree(w);
982
983 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
984 drbd_flush_after_epoch(mdev, epoch);
985
986 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
987 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
988
989 return 1;
990}
991
992/**
993 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
994 * @mdev: DRBD device.
995 * @epoch: Epoch object.
996 * @ev: Epoch event.
997 */
998static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
999 struct drbd_epoch *epoch,
1000 enum epoch_event ev)
1001{
1002 int finish, epoch_size;
1003 struct drbd_epoch *next_epoch;
1004 int schedule_flush = 0;
1005 enum finish_epoch rv = FE_STILL_LIVE;
1006
1007 spin_lock(&mdev->epoch_lock);
1008 do {
1009 next_epoch = NULL;
1010 finish = 0;
1011
1012 epoch_size = atomic_read(&epoch->epoch_size);
1013
1014 switch (ev & ~EV_CLEANUP) {
1015 case EV_PUT:
1016 atomic_dec(&epoch->active);
1017 break;
1018 case EV_GOT_BARRIER_NR:
1019 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1020
1021 /* Special case: If we just switched from WO_bio_barrier to
1022 WO_bdev_flush we should not finish the current epoch */
1023 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1024 mdev->write_ordering != WO_bio_barrier &&
1025 epoch == mdev->current_epoch)
1026 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1027 break;
1028 case EV_BARRIER_DONE:
1029 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1030 break;
1031 case EV_BECAME_LAST:
1032 /* nothing to do*/
1033 break;
1034 }
1035
b411b363
PR
1036 if (epoch_size != 0 &&
1037 atomic_read(&epoch->active) == 0 &&
1038 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1039 epoch->list.prev == &mdev->current_epoch->list &&
1040 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1041 /* Nearly all conditions are met to finish that epoch... */
1042 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1043 mdev->write_ordering == WO_none ||
1044 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1045 ev & EV_CLEANUP) {
1046 finish = 1;
1047 set_bit(DE_IS_FINISHING, &epoch->flags);
1048 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1049 mdev->write_ordering == WO_bio_barrier) {
1050 atomic_inc(&epoch->active);
1051 schedule_flush = 1;
1052 }
1053 }
1054 if (finish) {
1055 if (!(ev & EV_CLEANUP)) {
1056 spin_unlock(&mdev->epoch_lock);
1057 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1058 spin_lock(&mdev->epoch_lock);
1059 }
1060 dec_unacked(mdev);
1061
1062 if (mdev->current_epoch != epoch) {
1063 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1064 list_del(&epoch->list);
1065 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1066 mdev->epochs--;
b411b363
PR
1067 kfree(epoch);
1068
1069 if (rv == FE_STILL_LIVE)
1070 rv = FE_DESTROYED;
1071 } else {
1072 epoch->flags = 0;
1073 atomic_set(&epoch->epoch_size, 0);
1074 /* atomic_set(&epoch->active, 0); is alrady zero */
1075 if (rv == FE_STILL_LIVE)
1076 rv = FE_RECYCLED;
1077 }
1078 }
1079
1080 if (!next_epoch)
1081 break;
1082
1083 epoch = next_epoch;
1084 } while (1);
1085
1086 spin_unlock(&mdev->epoch_lock);
1087
1088 if (schedule_flush) {
1089 struct flush_work *fw;
1090 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1091 if (fw) {
b411b363
PR
1092 fw->w.cb = w_flush;
1093 fw->epoch = epoch;
1094 drbd_queue_work(&mdev->data.work, &fw->w);
1095 } else {
1096 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1097 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1098 /* That is not a recursion, only one level */
1099 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1100 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1101 }
1102 }
1103
1104 return rv;
1105}
1106
1107/**
1108 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1109 * @mdev: DRBD device.
1110 * @wo: Write ordering method to try.
1111 */
1112void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1113{
1114 enum write_ordering_e pwo;
1115 static char *write_ordering_str[] = {
1116 [WO_none] = "none",
1117 [WO_drain_io] = "drain",
1118 [WO_bdev_flush] = "flush",
1119 [WO_bio_barrier] = "barrier",
1120 };
1121
1122 pwo = mdev->write_ordering;
1123 wo = min(pwo, wo);
1124 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1125 wo = WO_bdev_flush;
1126 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1127 wo = WO_drain_io;
1128 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1129 wo = WO_none;
1130 mdev->write_ordering = wo;
1131 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1132 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1133}
1134
45bb912b
LE
1135/**
1136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
0c3f3451
PR
1153 if (atomic_read(&mdev->new_c_uuid)) {
1154 if (atomic_add_unless(&mdev->new_c_uuid, -1, 1)) {
1155 drbd_uuid_new_current(mdev);
1156 drbd_md_sync(mdev);
1157
1158 atomic_dec(&mdev->new_c_uuid);
1159 wake_up(&mdev->misc_wait);
1160 }
1161 wait_event(mdev->misc_wait, !atomic_read(&mdev->new_c_uuid));
1162 }
1163
45bb912b
LE
1164 /* In most cases, we will only need one bio. But in case the lower
1165 * level restrictions happen to be different at this offset on this
1166 * side than those of the sending peer, we may need to submit the
1167 * request in more than one bio. */
1168next_bio:
1169 bio = bio_alloc(GFP_NOIO, nr_pages);
1170 if (!bio) {
1171 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1172 goto fail;
1173 }
1174 /* > e->sector, unless this is the first bio */
1175 bio->bi_sector = sector;
1176 bio->bi_bdev = mdev->ldev->backing_bdev;
1177 /* we special case some flags in the multi-bio case, see below
1178 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1179 bio->bi_rw = rw;
1180 bio->bi_private = e;
1181 bio->bi_end_io = drbd_endio_sec;
1182
1183 bio->bi_next = bios;
1184 bios = bio;
1185 ++n_bios;
1186
1187 page_chain_for_each(page) {
1188 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1189 if (!bio_add_page(bio, page, len, 0)) {
1190 /* a single page must always be possible! */
1191 BUG_ON(bio->bi_vcnt == 0);
1192 goto next_bio;
1193 }
1194 ds -= len;
1195 sector += len >> 9;
1196 --nr_pages;
1197 }
1198 D_ASSERT(page == NULL);
1199 D_ASSERT(ds == 0);
1200
1201 atomic_set(&e->pending_bios, n_bios);
1202 do {
1203 bio = bios;
1204 bios = bios->bi_next;
1205 bio->bi_next = NULL;
1206
1207 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1208 if (bios)
1209 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1210
1211 drbd_generic_make_request(mdev, fault_type, bio);
1212
1213 /* strip off BIO_RW_BARRIER,
1214 * unless it is the first or last bio */
1215 if (bios && bios->bi_next)
1216 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1217 } while (bios);
1218 maybe_kick_lo(mdev);
1219 return 0;
1220
1221fail:
1222 while (bios) {
1223 bio = bios;
1224 bios = bios->bi_next;
1225 bio_put(bio);
1226 }
1227 return -ENOMEM;
1228}
1229
b411b363
PR
1230/**
1231 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1232 * @mdev: DRBD device.
1233 * @w: work object.
1234 * @cancel: The connection will be closed anyways (unused in this callback)
1235 */
1236int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1237{
1238 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
b411b363
PR
1239 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1240 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1241 so that we can finish that epoch in drbd_may_finish_epoch().
1242 That is necessary if we already have a long chain of Epochs, before
1243 we realize that BIO_RW_BARRIER is actually not supported */
1244
1245 /* As long as the -ENOTSUPP on the barrier is reported immediately
1246 that will never trigger. If it is reported late, we will just
1247 print that warning and continue correctly for all future requests
1248 with WO_bdev_flush */
1249 if (previous_epoch(mdev, e->epoch))
1250 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1251
b411b363
PR
1252 /* we still have a local reference,
1253 * get_ldev was done in receive_Data. */
b411b363
PR
1254
1255 e->w.cb = e_end_block;
45bb912b
LE
1256 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1257 /* drbd_submit_ee fails for one reason only:
1258 * if was not able to allocate sufficient bios.
1259 * requeue, try again later. */
1260 e->w.cb = w_e_reissue;
1261 drbd_queue_work(&mdev->data.work, &e->w);
1262 }
b411b363
PR
1263 return 1;
1264}
1265
1266static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1267{
1268 int rv, issue_flush;
1269 struct p_barrier *p = (struct p_barrier *)h;
1270 struct drbd_epoch *epoch;
1271
1272 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1273
1274 rv = drbd_recv(mdev, h->payload, h->length);
1275 ERR_IF(rv != h->length) return FALSE;
1276
1277 inc_unacked(mdev);
1278
1279 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1280 drbd_kick_lo(mdev);
1281
1282 mdev->current_epoch->barrier_nr = p->barrier;
1283 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1284
1285 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1286 * the activity log, which means it would not be resynced in case the
1287 * R_PRIMARY crashes now.
1288 * Therefore we must send the barrier_ack after the barrier request was
1289 * completed. */
1290 switch (mdev->write_ordering) {
1291 case WO_bio_barrier:
1292 case WO_none:
1293 if (rv == FE_RECYCLED)
1294 return TRUE;
1295 break;
1296
1297 case WO_bdev_flush:
1298 case WO_drain_io:
367a8d73
PR
1299 if (rv == FE_STILL_LIVE) {
1300 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1301 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1302 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1303 }
b411b363
PR
1304 if (rv == FE_RECYCLED)
1305 return TRUE;
1306
1307 /* The asender will send all the ACKs and barrier ACKs out, since
1308 all EEs moved from the active_ee to the done_ee. We need to
1309 provide a new epoch object for the EEs that come in soon */
1310 break;
1311 }
1312
1313 /* receiver context, in the writeout path of the other node.
1314 * avoid potential distributed deadlock */
1315 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1316 if (!epoch) {
1317 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
d3db7b48 1318 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
b411b363
PR
1319 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1320 if (issue_flush) {
1321 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1322 if (rv == FE_RECYCLED)
1323 return TRUE;
1324 }
1325
1326 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1327
1328 return TRUE;
1329 }
1330
1331 epoch->flags = 0;
1332 atomic_set(&epoch->epoch_size, 0);
1333 atomic_set(&epoch->active, 0);
1334
1335 spin_lock(&mdev->epoch_lock);
1336 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1337 list_add(&epoch->list, &mdev->current_epoch->list);
1338 mdev->current_epoch = epoch;
1339 mdev->epochs++;
b411b363
PR
1340 } else {
1341 /* The current_epoch got recycled while we allocated this one... */
1342 kfree(epoch);
1343 }
1344 spin_unlock(&mdev->epoch_lock);
1345
1346 return TRUE;
1347}
1348
1349/* used from receive_RSDataReply (recv_resync_read)
1350 * and from receive_Data */
1351static struct drbd_epoch_entry *
1352read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1353{
6666032a 1354 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363 1355 struct drbd_epoch_entry *e;
b411b363 1356 struct page *page;
45bb912b 1357 int dgs, ds, rr;
b411b363
PR
1358 void *dig_in = mdev->int_dig_in;
1359 void *dig_vv = mdev->int_dig_vv;
6b4388ac 1360 unsigned long *data;
b411b363
PR
1361
1362 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1363 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1364
1365 if (dgs) {
1366 rr = drbd_recv(mdev, dig_in, dgs);
1367 if (rr != dgs) {
1368 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1369 rr, dgs);
1370 return NULL;
1371 }
1372 }
1373
1374 data_size -= dgs;
1375
1376 ERR_IF(data_size & 0x1ff) return NULL;
1377 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1378
6666032a
LE
1379 /* even though we trust out peer,
1380 * we sometimes have to double check. */
1381 if (sector + (data_size>>9) > capacity) {
1382 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1383 (unsigned long long)capacity,
1384 (unsigned long long)sector, data_size);
1385 return NULL;
1386 }
1387
b411b363
PR
1388 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1389 * "criss-cross" setup, that might cause write-out on some other DRBD,
1390 * which in turn might block on the other node at this very place. */
1391 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1392 if (!e)
1393 return NULL;
45bb912b 1394
b411b363 1395 ds = data_size;
45bb912b
LE
1396 page = e->pages;
1397 page_chain_for_each(page) {
1398 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1399 data = kmap(page);
45bb912b 1400 rr = drbd_recv(mdev, data, len);
6b4388ac
PR
1401 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1402 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1403 data[0] = data[0] ^ (unsigned long)-1;
1404 }
b411b363 1405 kunmap(page);
45bb912b 1406 if (rr != len) {
b411b363
PR
1407 drbd_free_ee(mdev, e);
1408 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
45bb912b 1409 rr, len);
b411b363
PR
1410 return NULL;
1411 }
1412 ds -= rr;
1413 }
1414
1415 if (dgs) {
45bb912b 1416 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
b411b363
PR
1417 if (memcmp(dig_in, dig_vv, dgs)) {
1418 dev_err(DEV, "Digest integrity check FAILED.\n");
1419 drbd_bcast_ee(mdev, "digest failed",
1420 dgs, dig_in, dig_vv, e);
1421 drbd_free_ee(mdev, e);
1422 return NULL;
1423 }
1424 }
1425 mdev->recv_cnt += data_size>>9;
1426 return e;
1427}
1428
1429/* drbd_drain_block() just takes a data block
1430 * out of the socket input buffer, and discards it.
1431 */
1432static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1433{
1434 struct page *page;
1435 int rr, rv = 1;
1436 void *data;
1437
c3470cde
LE
1438 if (!data_size)
1439 return TRUE;
1440
45bb912b 1441 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1442
1443 data = kmap(page);
1444 while (data_size) {
1445 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1446 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1447 rv = 0;
1448 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1449 rr, min_t(int, data_size, PAGE_SIZE));
1450 break;
1451 }
1452 data_size -= rr;
1453 }
1454 kunmap(page);
1455 drbd_pp_free(mdev, page);
1456 return rv;
1457}
1458
1459static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1460 sector_t sector, int data_size)
1461{
1462 struct bio_vec *bvec;
1463 struct bio *bio;
1464 int dgs, rr, i, expect;
1465 void *dig_in = mdev->int_dig_in;
1466 void *dig_vv = mdev->int_dig_vv;
1467
1468 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1469 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1470
1471 if (dgs) {
1472 rr = drbd_recv(mdev, dig_in, dgs);
1473 if (rr != dgs) {
1474 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1475 rr, dgs);
1476 return 0;
1477 }
1478 }
1479
1480 data_size -= dgs;
1481
1482 /* optimistically update recv_cnt. if receiving fails below,
1483 * we disconnect anyways, and counters will be reset. */
1484 mdev->recv_cnt += data_size>>9;
1485
1486 bio = req->master_bio;
1487 D_ASSERT(sector == bio->bi_sector);
1488
1489 bio_for_each_segment(bvec, bio, i) {
1490 expect = min_t(int, data_size, bvec->bv_len);
1491 rr = drbd_recv(mdev,
1492 kmap(bvec->bv_page)+bvec->bv_offset,
1493 expect);
1494 kunmap(bvec->bv_page);
1495 if (rr != expect) {
1496 dev_warn(DEV, "short read receiving data reply: "
1497 "read %d expected %d\n",
1498 rr, expect);
1499 return 0;
1500 }
1501 data_size -= rr;
1502 }
1503
1504 if (dgs) {
45bb912b 1505 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1506 if (memcmp(dig_in, dig_vv, dgs)) {
1507 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1508 return 0;
1509 }
1510 }
1511
1512 D_ASSERT(data_size == 0);
1513 return 1;
1514}
1515
1516/* e_end_resync_block() is called via
1517 * drbd_process_done_ee() by asender only */
1518static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1519{
1520 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1521 sector_t sector = e->sector;
1522 int ok;
1523
1524 D_ASSERT(hlist_unhashed(&e->colision));
1525
45bb912b 1526 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1527 drbd_set_in_sync(mdev, sector, e->size);
1528 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1529 } else {
1530 /* Record failure to sync */
1531 drbd_rs_failed_io(mdev, sector, e->size);
1532
1533 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1534 }
1535 dec_unacked(mdev);
1536
1537 return ok;
1538}
1539
1540static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1541{
1542 struct drbd_epoch_entry *e;
1543
1544 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
45bb912b
LE
1545 if (!e)
1546 goto fail;
b411b363
PR
1547
1548 dec_rs_pending(mdev);
1549
b411b363
PR
1550 inc_unacked(mdev);
1551 /* corresponding dec_unacked() in e_end_resync_block()
1552 * respective _drbd_clear_done_ee */
1553
45bb912b
LE
1554 e->w.cb = e_end_resync_block;
1555
b411b363
PR
1556 spin_lock_irq(&mdev->req_lock);
1557 list_add(&e->w.list, &mdev->sync_ee);
1558 spin_unlock_irq(&mdev->req_lock);
1559
45bb912b
LE
1560 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1561 return TRUE;
b411b363 1562
45bb912b
LE
1563 drbd_free_ee(mdev, e);
1564fail:
1565 put_ldev(mdev);
1566 return FALSE;
b411b363
PR
1567}
1568
1569static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1570{
1571 struct drbd_request *req;
1572 sector_t sector;
1573 unsigned int header_size, data_size;
1574 int ok;
1575 struct p_data *p = (struct p_data *)h;
1576
1577 header_size = sizeof(*p) - sizeof(*h);
1578 data_size = h->length - header_size;
1579
1580 ERR_IF(data_size == 0) return FALSE;
1581
1582 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1583 return FALSE;
1584
1585 sector = be64_to_cpu(p->sector);
1586
1587 spin_lock_irq(&mdev->req_lock);
1588 req = _ar_id_to_req(mdev, p->block_id, sector);
1589 spin_unlock_irq(&mdev->req_lock);
1590 if (unlikely(!req)) {
1591 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1592 return FALSE;
1593 }
1594
1595 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1596 * special casing it there for the various failure cases.
1597 * still no race with drbd_fail_pending_reads */
1598 ok = recv_dless_read(mdev, req, sector, data_size);
1599
1600 if (ok)
1601 req_mod(req, data_received);
1602 /* else: nothing. handled from drbd_disconnect...
1603 * I don't think we may complete this just yet
1604 * in case we are "on-disconnect: freeze" */
1605
1606 return ok;
1607}
1608
1609static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1610{
1611 sector_t sector;
1612 unsigned int header_size, data_size;
1613 int ok;
1614 struct p_data *p = (struct p_data *)h;
1615
1616 header_size = sizeof(*p) - sizeof(*h);
1617 data_size = h->length - header_size;
1618
1619 ERR_IF(data_size == 0) return FALSE;
1620
1621 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1622 return FALSE;
1623
1624 sector = be64_to_cpu(p->sector);
1625 D_ASSERT(p->block_id == ID_SYNCER);
1626
1627 if (get_ldev(mdev)) {
1628 /* data is submitted to disk within recv_resync_read.
1629 * corresponding put_ldev done below on error,
1630 * or in drbd_endio_write_sec. */
1631 ok = recv_resync_read(mdev, sector, data_size);
1632 } else {
1633 if (__ratelimit(&drbd_ratelimit_state))
1634 dev_err(DEV, "Can not write resync data to local disk.\n");
1635
1636 ok = drbd_drain_block(mdev, data_size);
1637
1638 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1639 }
1640
1641 return ok;
1642}
1643
1644/* e_end_block() is called via drbd_process_done_ee().
1645 * this means this function only runs in the asender thread
1646 */
1647static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1648{
1649 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1650 sector_t sector = e->sector;
1651 struct drbd_epoch *epoch;
1652 int ok = 1, pcmd;
1653
1654 if (e->flags & EE_IS_BARRIER) {
1655 epoch = previous_epoch(mdev, e->epoch);
1656 if (epoch)
1657 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1658 }
1659
1660 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
45bb912b 1661 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1662 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1663 mdev->state.conn <= C_PAUSED_SYNC_T &&
1664 e->flags & EE_MAY_SET_IN_SYNC) ?
1665 P_RS_WRITE_ACK : P_WRITE_ACK;
1666 ok &= drbd_send_ack(mdev, pcmd, e);
1667 if (pcmd == P_RS_WRITE_ACK)
1668 drbd_set_in_sync(mdev, sector, e->size);
1669 } else {
1670 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1671 /* we expect it to be marked out of sync anyways...
1672 * maybe assert this? */
1673 }
1674 dec_unacked(mdev);
1675 }
1676 /* we delete from the conflict detection hash _after_ we sent out the
1677 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1678 if (mdev->net_conf->two_primaries) {
1679 spin_lock_irq(&mdev->req_lock);
1680 D_ASSERT(!hlist_unhashed(&e->colision));
1681 hlist_del_init(&e->colision);
1682 spin_unlock_irq(&mdev->req_lock);
1683 } else {
1684 D_ASSERT(hlist_unhashed(&e->colision));
1685 }
1686
1687 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1688
1689 return ok;
1690}
1691
1692static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1693{
1694 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1695 int ok = 1;
1696
1697 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1698 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1699
1700 spin_lock_irq(&mdev->req_lock);
1701 D_ASSERT(!hlist_unhashed(&e->colision));
1702 hlist_del_init(&e->colision);
1703 spin_unlock_irq(&mdev->req_lock);
1704
1705 dec_unacked(mdev);
1706
1707 return ok;
1708}
1709
1710/* Called from receive_Data.
1711 * Synchronize packets on sock with packets on msock.
1712 *
1713 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1714 * packet traveling on msock, they are still processed in the order they have
1715 * been sent.
1716 *
1717 * Note: we don't care for Ack packets overtaking P_DATA packets.
1718 *
1719 * In case packet_seq is larger than mdev->peer_seq number, there are
1720 * outstanding packets on the msock. We wait for them to arrive.
1721 * In case we are the logically next packet, we update mdev->peer_seq
1722 * ourselves. Correctly handles 32bit wrap around.
1723 *
1724 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1725 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1726 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1727 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1728 *
1729 * returns 0 if we may process the packet,
1730 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1731static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1732{
1733 DEFINE_WAIT(wait);
1734 unsigned int p_seq;
1735 long timeout;
1736 int ret = 0;
1737 spin_lock(&mdev->peer_seq_lock);
1738 for (;;) {
1739 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1740 if (seq_le(packet_seq, mdev->peer_seq+1))
1741 break;
1742 if (signal_pending(current)) {
1743 ret = -ERESTARTSYS;
1744 break;
1745 }
1746 p_seq = mdev->peer_seq;
1747 spin_unlock(&mdev->peer_seq_lock);
1748 timeout = schedule_timeout(30*HZ);
1749 spin_lock(&mdev->peer_seq_lock);
1750 if (timeout == 0 && p_seq == mdev->peer_seq) {
1751 ret = -ETIMEDOUT;
1752 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1753 break;
1754 }
1755 }
1756 finish_wait(&mdev->seq_wait, &wait);
1757 if (mdev->peer_seq+1 == packet_seq)
1758 mdev->peer_seq++;
1759 spin_unlock(&mdev->peer_seq_lock);
1760 return ret;
1761}
1762
1763/* mirrored write */
1764static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1765{
1766 sector_t sector;
1767 struct drbd_epoch_entry *e;
1768 struct p_data *p = (struct p_data *)h;
1769 int header_size, data_size;
1770 int rw = WRITE;
1771 u32 dp_flags;
1772
1773 header_size = sizeof(*p) - sizeof(*h);
1774 data_size = h->length - header_size;
1775
1776 ERR_IF(data_size == 0) return FALSE;
1777
1778 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1779 return FALSE;
1780
1781 if (!get_ldev(mdev)) {
1782 if (__ratelimit(&drbd_ratelimit_state))
1783 dev_err(DEV, "Can not write mirrored data block "
1784 "to local disk.\n");
1785 spin_lock(&mdev->peer_seq_lock);
1786 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1787 mdev->peer_seq++;
1788 spin_unlock(&mdev->peer_seq_lock);
1789
1790 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1791 atomic_inc(&mdev->current_epoch->epoch_size);
1792 return drbd_drain_block(mdev, data_size);
1793 }
1794
1795 /* get_ldev(mdev) successful.
1796 * Corresponding put_ldev done either below (on various errors),
1797 * or in drbd_endio_write_sec, if we successfully submit the data at
1798 * the end of this function. */
1799
1800 sector = be64_to_cpu(p->sector);
1801 e = read_in_block(mdev, p->block_id, sector, data_size);
1802 if (!e) {
1803 put_ldev(mdev);
1804 return FALSE;
1805 }
1806
b411b363
PR
1807 e->w.cb = e_end_block;
1808
1809 spin_lock(&mdev->epoch_lock);
1810 e->epoch = mdev->current_epoch;
1811 atomic_inc(&e->epoch->epoch_size);
1812 atomic_inc(&e->epoch->active);
1813
1814 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1815 struct drbd_epoch *epoch;
1816 /* Issue a barrier if we start a new epoch, and the previous epoch
1817 was not a epoch containing a single request which already was
1818 a Barrier. */
1819 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1820 if (epoch == e->epoch) {
1821 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
b411b363
PR
1822 rw |= (1<<BIO_RW_BARRIER);
1823 e->flags |= EE_IS_BARRIER;
1824 } else {
1825 if (atomic_read(&epoch->epoch_size) > 1 ||
1826 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1827 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
b411b363 1828 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
b411b363
PR
1829 rw |= (1<<BIO_RW_BARRIER);
1830 e->flags |= EE_IS_BARRIER;
1831 }
1832 }
1833 }
1834 spin_unlock(&mdev->epoch_lock);
1835
1836 dp_flags = be32_to_cpu(p->dp_flags);
1837 if (dp_flags & DP_HARDBARRIER) {
1838 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1839 /* rw |= (1<<BIO_RW_BARRIER); */
1840 }
1841 if (dp_flags & DP_RW_SYNC)
1842 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1843 if (dp_flags & DP_MAY_SET_IN_SYNC)
1844 e->flags |= EE_MAY_SET_IN_SYNC;
1845
1846 /* I'm the receiver, I do hold a net_cnt reference. */
1847 if (!mdev->net_conf->two_primaries) {
1848 spin_lock_irq(&mdev->req_lock);
1849 } else {
1850 /* don't get the req_lock yet,
1851 * we may sleep in drbd_wait_peer_seq */
1852 const int size = e->size;
1853 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1854 DEFINE_WAIT(wait);
1855 struct drbd_request *i;
1856 struct hlist_node *n;
1857 struct hlist_head *slot;
1858 int first;
1859
1860 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1861 BUG_ON(mdev->ee_hash == NULL);
1862 BUG_ON(mdev->tl_hash == NULL);
1863
1864 /* conflict detection and handling:
1865 * 1. wait on the sequence number,
1866 * in case this data packet overtook ACK packets.
1867 * 2. check our hash tables for conflicting requests.
1868 * we only need to walk the tl_hash, since an ee can not
1869 * have a conflict with an other ee: on the submitting
1870 * node, the corresponding req had already been conflicting,
1871 * and a conflicting req is never sent.
1872 *
1873 * Note: for two_primaries, we are protocol C,
1874 * so there cannot be any request that is DONE
1875 * but still on the transfer log.
1876 *
1877 * unconditionally add to the ee_hash.
1878 *
1879 * if no conflicting request is found:
1880 * submit.
1881 *
1882 * if any conflicting request is found
1883 * that has not yet been acked,
1884 * AND I have the "discard concurrent writes" flag:
1885 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1886 *
1887 * if any conflicting request is found:
1888 * block the receiver, waiting on misc_wait
1889 * until no more conflicting requests are there,
1890 * or we get interrupted (disconnect).
1891 *
1892 * we do not just write after local io completion of those
1893 * requests, but only after req is done completely, i.e.
1894 * we wait for the P_DISCARD_ACK to arrive!
1895 *
1896 * then proceed normally, i.e. submit.
1897 */
1898 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1899 goto out_interrupted;
1900
1901 spin_lock_irq(&mdev->req_lock);
1902
1903 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1904
1905#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1906 slot = tl_hash_slot(mdev, sector);
1907 first = 1;
1908 for (;;) {
1909 int have_unacked = 0;
1910 int have_conflict = 0;
1911 prepare_to_wait(&mdev->misc_wait, &wait,
1912 TASK_INTERRUPTIBLE);
1913 hlist_for_each_entry(i, n, slot, colision) {
1914 if (OVERLAPS) {
1915 /* only ALERT on first iteration,
1916 * we may be woken up early... */
1917 if (first)
1918 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1919 " new: %llus +%u; pending: %llus +%u\n",
1920 current->comm, current->pid,
1921 (unsigned long long)sector, size,
1922 (unsigned long long)i->sector, i->size);
1923 if (i->rq_state & RQ_NET_PENDING)
1924 ++have_unacked;
1925 ++have_conflict;
1926 }
1927 }
1928#undef OVERLAPS
1929 if (!have_conflict)
1930 break;
1931
1932 /* Discard Ack only for the _first_ iteration */
1933 if (first && discard && have_unacked) {
1934 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1935 (unsigned long long)sector);
1936 inc_unacked(mdev);
1937 e->w.cb = e_send_discard_ack;
1938 list_add_tail(&e->w.list, &mdev->done_ee);
1939
1940 spin_unlock_irq(&mdev->req_lock);
1941
1942 /* we could probably send that P_DISCARD_ACK ourselves,
1943 * but I don't like the receiver using the msock */
1944
1945 put_ldev(mdev);
1946 wake_asender(mdev);
1947 finish_wait(&mdev->misc_wait, &wait);
1948 return TRUE;
1949 }
1950
1951 if (signal_pending(current)) {
1952 hlist_del_init(&e->colision);
1953
1954 spin_unlock_irq(&mdev->req_lock);
1955
1956 finish_wait(&mdev->misc_wait, &wait);
1957 goto out_interrupted;
1958 }
1959
1960 spin_unlock_irq(&mdev->req_lock);
1961 if (first) {
1962 first = 0;
1963 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1964 "sec=%llus\n", (unsigned long long)sector);
1965 } else if (discard) {
1966 /* we had none on the first iteration.
1967 * there must be none now. */
1968 D_ASSERT(have_unacked == 0);
1969 }
1970 schedule();
1971 spin_lock_irq(&mdev->req_lock);
1972 }
1973 finish_wait(&mdev->misc_wait, &wait);
1974 }
1975
1976 list_add(&e->w.list, &mdev->active_ee);
1977 spin_unlock_irq(&mdev->req_lock);
1978
1979 switch (mdev->net_conf->wire_protocol) {
1980 case DRBD_PROT_C:
1981 inc_unacked(mdev);
1982 /* corresponding dec_unacked() in e_end_block()
1983 * respective _drbd_clear_done_ee */
1984 break;
1985 case DRBD_PROT_B:
1986 /* I really don't like it that the receiver thread
1987 * sends on the msock, but anyways */
1988 drbd_send_ack(mdev, P_RECV_ACK, e);
1989 break;
1990 case DRBD_PROT_A:
1991 /* nothing to do */
1992 break;
1993 }
1994
1995 if (mdev->state.pdsk == D_DISKLESS) {
1996 /* In case we have the only disk of the cluster, */
1997 drbd_set_out_of_sync(mdev, e->sector, e->size);
1998 e->flags |= EE_CALL_AL_COMPLETE_IO;
1999 drbd_al_begin_io(mdev, e->sector);
2000 }
2001
45bb912b
LE
2002 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2003 return TRUE;
b411b363
PR
2004
2005out_interrupted:
2006 /* yes, the epoch_size now is imbalanced.
2007 * but we drop the connection anyways, so we don't have a chance to
2008 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2009 put_ldev(mdev);
2010 drbd_free_ee(mdev, e);
2011 return FALSE;
2012}
2013
2014static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2015{
2016 sector_t sector;
2017 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2018 struct drbd_epoch_entry *e;
2019 struct digest_info *di = NULL;
2020 int size, digest_size;
2021 unsigned int fault_type;
2022 struct p_block_req *p =
2023 (struct p_block_req *)h;
2024 const int brps = sizeof(*p)-sizeof(*h);
2025
2026 if (drbd_recv(mdev, h->payload, brps) != brps)
2027 return FALSE;
2028
2029 sector = be64_to_cpu(p->sector);
2030 size = be32_to_cpu(p->blksize);
2031
2032 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2033 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2034 (unsigned long long)sector, size);
2035 return FALSE;
2036 }
2037 if (sector + (size>>9) > capacity) {
2038 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2039 (unsigned long long)sector, size);
2040 return FALSE;
2041 }
2042
2043 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2044 if (__ratelimit(&drbd_ratelimit_state))
2045 dev_err(DEV, "Can not satisfy peer's read request, "
2046 "no local data.\n");
2047 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2048 P_NEG_RS_DREPLY , p);
c3470cde 2049 return drbd_drain_block(mdev, h->length - brps);
b411b363
PR
2050 }
2051
2052 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2053 * "criss-cross" setup, that might cause write-out on some other DRBD,
2054 * which in turn might block on the other node at this very place. */
2055 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2056 if (!e) {
2057 put_ldev(mdev);
2058 return FALSE;
2059 }
2060
b411b363
PR
2061 switch (h->command) {
2062 case P_DATA_REQUEST:
2063 e->w.cb = w_e_end_data_req;
2064 fault_type = DRBD_FAULT_DT_RD;
2065 break;
2066 case P_RS_DATA_REQUEST:
2067 e->w.cb = w_e_end_rsdata_req;
2068 fault_type = DRBD_FAULT_RS_RD;
2069 /* Eventually this should become asynchronously. Currently it
2070 * blocks the whole receiver just to delay the reading of a
2071 * resync data block.
2072 * the drbd_work_queue mechanism is made for this...
2073 */
2074 if (!drbd_rs_begin_io(mdev, sector)) {
2075 /* we have been interrupted,
2076 * probably connection lost! */
2077 D_ASSERT(signal_pending(current));
2078 goto out_free_e;
2079 }
2080 break;
2081
2082 case P_OV_REPLY:
2083 case P_CSUM_RS_REQUEST:
2084 fault_type = DRBD_FAULT_RS_RD;
2085 digest_size = h->length - brps ;
2086 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2087 if (!di)
2088 goto out_free_e;
2089
2090 di->digest_size = digest_size;
2091 di->digest = (((char *)di)+sizeof(struct digest_info));
2092
2093 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2094 goto out_free_e;
2095
2096 e->block_id = (u64)(unsigned long)di;
2097 if (h->command == P_CSUM_RS_REQUEST) {
2098 D_ASSERT(mdev->agreed_pro_version >= 89);
2099 e->w.cb = w_e_end_csum_rs_req;
2100 } else if (h->command == P_OV_REPLY) {
2101 e->w.cb = w_e_end_ov_reply;
2102 dec_rs_pending(mdev);
2103 break;
2104 }
2105
2106 if (!drbd_rs_begin_io(mdev, sector)) {
2107 /* we have been interrupted, probably connection lost! */
2108 D_ASSERT(signal_pending(current));
2109 goto out_free_e;
2110 }
2111 break;
2112
2113 case P_OV_REQUEST:
2114 if (mdev->state.conn >= C_CONNECTED &&
2115 mdev->state.conn != C_VERIFY_T)
2116 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2117 drbd_conn_str(mdev->state.conn));
2118 if (mdev->ov_start_sector == ~(sector_t)0 &&
2119 mdev->agreed_pro_version >= 90) {
2120 mdev->ov_start_sector = sector;
2121 mdev->ov_position = sector;
2122 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2123 dev_info(DEV, "Online Verify start sector: %llu\n",
2124 (unsigned long long)sector);
2125 }
2126 e->w.cb = w_e_end_ov_req;
2127 fault_type = DRBD_FAULT_RS_RD;
2128 /* Eventually this should become asynchronous. Currently it
2129 * blocks the whole receiver just to delay the reading of a
2130 * resync data block.
2131 * the drbd_work_queue mechanism is made for this...
2132 */
2133 if (!drbd_rs_begin_io(mdev, sector)) {
2134 /* we have been interrupted,
2135 * probably connection lost! */
2136 D_ASSERT(signal_pending(current));
2137 goto out_free_e;
2138 }
2139 break;
2140
2141
2142 default:
2143 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2144 cmdname(h->command));
2145 fault_type = DRBD_FAULT_MAX;
2146 }
2147
2148 spin_lock_irq(&mdev->req_lock);
2149 list_add(&e->w.list, &mdev->read_ee);
2150 spin_unlock_irq(&mdev->req_lock);
2151
2152 inc_unacked(mdev);
2153
45bb912b
LE
2154 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2155 return TRUE;
b411b363
PR
2156
2157out_free_e:
2158 kfree(di);
2159 put_ldev(mdev);
2160 drbd_free_ee(mdev, e);
2161 return FALSE;
2162}
2163
2164static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2165{
2166 int self, peer, rv = -100;
2167 unsigned long ch_self, ch_peer;
2168
2169 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2170 peer = mdev->p_uuid[UI_BITMAP] & 1;
2171
2172 ch_peer = mdev->p_uuid[UI_SIZE];
2173 ch_self = mdev->comm_bm_set;
2174
2175 switch (mdev->net_conf->after_sb_0p) {
2176 case ASB_CONSENSUS:
2177 case ASB_DISCARD_SECONDARY:
2178 case ASB_CALL_HELPER:
2179 dev_err(DEV, "Configuration error.\n");
2180 break;
2181 case ASB_DISCONNECT:
2182 break;
2183 case ASB_DISCARD_YOUNGER_PRI:
2184 if (self == 0 && peer == 1) {
2185 rv = -1;
2186 break;
2187 }
2188 if (self == 1 && peer == 0) {
2189 rv = 1;
2190 break;
2191 }
2192 /* Else fall through to one of the other strategies... */
2193 case ASB_DISCARD_OLDER_PRI:
2194 if (self == 0 && peer == 1) {
2195 rv = 1;
2196 break;
2197 }
2198 if (self == 1 && peer == 0) {
2199 rv = -1;
2200 break;
2201 }
2202 /* Else fall through to one of the other strategies... */
ad19bf6e 2203 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2204 "Using discard-least-changes instead\n");
2205 case ASB_DISCARD_ZERO_CHG:
2206 if (ch_peer == 0 && ch_self == 0) {
2207 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2208 ? -1 : 1;
2209 break;
2210 } else {
2211 if (ch_peer == 0) { rv = 1; break; }
2212 if (ch_self == 0) { rv = -1; break; }
2213 }
2214 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2215 break;
2216 case ASB_DISCARD_LEAST_CHG:
2217 if (ch_self < ch_peer)
2218 rv = -1;
2219 else if (ch_self > ch_peer)
2220 rv = 1;
2221 else /* ( ch_self == ch_peer ) */
2222 /* Well, then use something else. */
2223 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2224 ? -1 : 1;
2225 break;
2226 case ASB_DISCARD_LOCAL:
2227 rv = -1;
2228 break;
2229 case ASB_DISCARD_REMOTE:
2230 rv = 1;
2231 }
2232
2233 return rv;
2234}
2235
2236static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2237{
2238 int self, peer, hg, rv = -100;
2239
2240 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2241 peer = mdev->p_uuid[UI_BITMAP] & 1;
2242
2243 switch (mdev->net_conf->after_sb_1p) {
2244 case ASB_DISCARD_YOUNGER_PRI:
2245 case ASB_DISCARD_OLDER_PRI:
2246 case ASB_DISCARD_LEAST_CHG:
2247 case ASB_DISCARD_LOCAL:
2248 case ASB_DISCARD_REMOTE:
2249 dev_err(DEV, "Configuration error.\n");
2250 break;
2251 case ASB_DISCONNECT:
2252 break;
2253 case ASB_CONSENSUS:
2254 hg = drbd_asb_recover_0p(mdev);
2255 if (hg == -1 && mdev->state.role == R_SECONDARY)
2256 rv = hg;
2257 if (hg == 1 && mdev->state.role == R_PRIMARY)
2258 rv = hg;
2259 break;
2260 case ASB_VIOLENTLY:
2261 rv = drbd_asb_recover_0p(mdev);
2262 break;
2263 case ASB_DISCARD_SECONDARY:
2264 return mdev->state.role == R_PRIMARY ? 1 : -1;
2265 case ASB_CALL_HELPER:
2266 hg = drbd_asb_recover_0p(mdev);
2267 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2268 self = drbd_set_role(mdev, R_SECONDARY, 0);
2269 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2270 * we might be here in C_WF_REPORT_PARAMS which is transient.
2271 * we do not need to wait for the after state change work either. */
2272 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2273 if (self != SS_SUCCESS) {
2274 drbd_khelper(mdev, "pri-lost-after-sb");
2275 } else {
2276 dev_warn(DEV, "Successfully gave up primary role.\n");
2277 rv = hg;
2278 }
2279 } else
2280 rv = hg;
2281 }
2282
2283 return rv;
2284}
2285
2286static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2287{
2288 int self, peer, hg, rv = -100;
2289
2290 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2291 peer = mdev->p_uuid[UI_BITMAP] & 1;
2292
2293 switch (mdev->net_conf->after_sb_2p) {
2294 case ASB_DISCARD_YOUNGER_PRI:
2295 case ASB_DISCARD_OLDER_PRI:
2296 case ASB_DISCARD_LEAST_CHG:
2297 case ASB_DISCARD_LOCAL:
2298 case ASB_DISCARD_REMOTE:
2299 case ASB_CONSENSUS:
2300 case ASB_DISCARD_SECONDARY:
2301 dev_err(DEV, "Configuration error.\n");
2302 break;
2303 case ASB_VIOLENTLY:
2304 rv = drbd_asb_recover_0p(mdev);
2305 break;
2306 case ASB_DISCONNECT:
2307 break;
2308 case ASB_CALL_HELPER:
2309 hg = drbd_asb_recover_0p(mdev);
2310 if (hg == -1) {
2311 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2312 * we might be here in C_WF_REPORT_PARAMS which is transient.
2313 * we do not need to wait for the after state change work either. */
2314 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2315 if (self != SS_SUCCESS) {
2316 drbd_khelper(mdev, "pri-lost-after-sb");
2317 } else {
2318 dev_warn(DEV, "Successfully gave up primary role.\n");
2319 rv = hg;
2320 }
2321 } else
2322 rv = hg;
2323 }
2324
2325 return rv;
2326}
2327
2328static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2329 u64 bits, u64 flags)
2330{
2331 if (!uuid) {
2332 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2333 return;
2334 }
2335 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2336 text,
2337 (unsigned long long)uuid[UI_CURRENT],
2338 (unsigned long long)uuid[UI_BITMAP],
2339 (unsigned long long)uuid[UI_HISTORY_START],
2340 (unsigned long long)uuid[UI_HISTORY_END],
2341 (unsigned long long)bits,
2342 (unsigned long long)flags);
2343}
2344
2345/*
2346 100 after split brain try auto recover
2347 2 C_SYNC_SOURCE set BitMap
2348 1 C_SYNC_SOURCE use BitMap
2349 0 no Sync
2350 -1 C_SYNC_TARGET use BitMap
2351 -2 C_SYNC_TARGET set BitMap
2352 -100 after split brain, disconnect
2353-1000 unrelated data
2354 */
2355static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2356{
2357 u64 self, peer;
2358 int i, j;
2359
2360 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2361 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2362
2363 *rule_nr = 10;
2364 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2365 return 0;
2366
2367 *rule_nr = 20;
2368 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2369 peer != UUID_JUST_CREATED)
2370 return -2;
2371
2372 *rule_nr = 30;
2373 if (self != UUID_JUST_CREATED &&
2374 (peer == UUID_JUST_CREATED || peer == (u64)0))
2375 return 2;
2376
2377 if (self == peer) {
2378 int rct, dc; /* roles at crash time */
2379
2380 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2381
2382 if (mdev->agreed_pro_version < 91)
2383 return -1001;
2384
2385 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2386 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2387 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2388 drbd_uuid_set_bm(mdev, 0UL);
2389
2390 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2391 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2392 *rule_nr = 34;
2393 } else {
2394 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2395 *rule_nr = 36;
2396 }
2397
2398 return 1;
2399 }
2400
2401 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2402
2403 if (mdev->agreed_pro_version < 91)
2404 return -1001;
2405
2406 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2407 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2408 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2409
2410 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2411 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2412 mdev->p_uuid[UI_BITMAP] = 0UL;
2413
2414 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2415 *rule_nr = 35;
2416 } else {
2417 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2418 *rule_nr = 37;
2419 }
2420
2421 return -1;
2422 }
2423
2424 /* Common power [off|failure] */
2425 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2426 (mdev->p_uuid[UI_FLAGS] & 2);
2427 /* lowest bit is set when we were primary,
2428 * next bit (weight 2) is set when peer was primary */
2429 *rule_nr = 40;
2430
2431 switch (rct) {
2432 case 0: /* !self_pri && !peer_pri */ return 0;
2433 case 1: /* self_pri && !peer_pri */ return 1;
2434 case 2: /* !self_pri && peer_pri */ return -1;
2435 case 3: /* self_pri && peer_pri */
2436 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2437 return dc ? -1 : 1;
2438 }
2439 }
2440
2441 *rule_nr = 50;
2442 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2443 if (self == peer)
2444 return -1;
2445
2446 *rule_nr = 51;
2447 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2448 if (self == peer) {
2449 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2450 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2451 if (self == peer) {
2452 /* The last P_SYNC_UUID did not get though. Undo the last start of
2453 resync as sync source modifications of the peer's UUIDs. */
2454
2455 if (mdev->agreed_pro_version < 91)
2456 return -1001;
2457
2458 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2459 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2460 return -1;
2461 }
2462 }
2463
2464 *rule_nr = 60;
2465 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2466 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2467 peer = mdev->p_uuid[i] & ~((u64)1);
2468 if (self == peer)
2469 return -2;
2470 }
2471
2472 *rule_nr = 70;
2473 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2474 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2475 if (self == peer)
2476 return 1;
2477
2478 *rule_nr = 71;
2479 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2480 if (self == peer) {
2481 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2482 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2483 if (self == peer) {
2484 /* The last P_SYNC_UUID did not get though. Undo the last start of
2485 resync as sync source modifications of our UUIDs. */
2486
2487 if (mdev->agreed_pro_version < 91)
2488 return -1001;
2489
2490 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2491 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2492
2493 dev_info(DEV, "Undid last start of resync:\n");
2494
2495 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2496 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2497
2498 return 1;
2499 }
2500 }
2501
2502
2503 *rule_nr = 80;
d8c2a36b 2504 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2505 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2506 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2507 if (self == peer)
2508 return 2;
2509 }
2510
2511 *rule_nr = 90;
2512 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2513 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2514 if (self == peer && self != ((u64)0))
2515 return 100;
2516
2517 *rule_nr = 100;
2518 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2519 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2520 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2521 peer = mdev->p_uuid[j] & ~((u64)1);
2522 if (self == peer)
2523 return -100;
2524 }
2525 }
2526
2527 return -1000;
2528}
2529
2530/* drbd_sync_handshake() returns the new conn state on success, or
2531 CONN_MASK (-1) on failure.
2532 */
2533static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2534 enum drbd_disk_state peer_disk) __must_hold(local)
2535{
2536 int hg, rule_nr;
2537 enum drbd_conns rv = C_MASK;
2538 enum drbd_disk_state mydisk;
2539
2540 mydisk = mdev->state.disk;
2541 if (mydisk == D_NEGOTIATING)
2542 mydisk = mdev->new_state_tmp.disk;
2543
2544 dev_info(DEV, "drbd_sync_handshake:\n");
2545 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2546 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2547 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2548
2549 hg = drbd_uuid_compare(mdev, &rule_nr);
2550
2551 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2552
2553 if (hg == -1000) {
2554 dev_alert(DEV, "Unrelated data, aborting!\n");
2555 return C_MASK;
2556 }
2557 if (hg == -1001) {
2558 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2559 return C_MASK;
2560 }
2561
2562 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2563 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2564 int f = (hg == -100) || abs(hg) == 2;
2565 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2566 if (f)
2567 hg = hg*2;
2568 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2569 hg > 0 ? "source" : "target");
2570 }
2571
3a11a487
AG
2572 if (abs(hg) == 100)
2573 drbd_khelper(mdev, "initial-split-brain");
2574
b411b363
PR
2575 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2576 int pcount = (mdev->state.role == R_PRIMARY)
2577 + (peer_role == R_PRIMARY);
2578 int forced = (hg == -100);
2579
2580 switch (pcount) {
2581 case 0:
2582 hg = drbd_asb_recover_0p(mdev);
2583 break;
2584 case 1:
2585 hg = drbd_asb_recover_1p(mdev);
2586 break;
2587 case 2:
2588 hg = drbd_asb_recover_2p(mdev);
2589 break;
2590 }
2591 if (abs(hg) < 100) {
2592 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2593 "automatically solved. Sync from %s node\n",
2594 pcount, (hg < 0) ? "peer" : "this");
2595 if (forced) {
2596 dev_warn(DEV, "Doing a full sync, since"
2597 " UUIDs where ambiguous.\n");
2598 hg = hg*2;
2599 }
2600 }
2601 }
2602
2603 if (hg == -100) {
2604 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2605 hg = -1;
2606 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2607 hg = 1;
2608
2609 if (abs(hg) < 100)
2610 dev_warn(DEV, "Split-Brain detected, manually solved. "
2611 "Sync from %s node\n",
2612 (hg < 0) ? "peer" : "this");
2613 }
2614
2615 if (hg == -100) {
580b9767
LE
2616 /* FIXME this log message is not correct if we end up here
2617 * after an attempted attach on a diskless node.
2618 * We just refuse to attach -- well, we drop the "connection"
2619 * to that disk, in a way... */
3a11a487 2620 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2621 drbd_khelper(mdev, "split-brain");
2622 return C_MASK;
2623 }
2624
2625 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2626 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2627 return C_MASK;
2628 }
2629
2630 if (hg < 0 && /* by intention we do not use mydisk here. */
2631 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2632 switch (mdev->net_conf->rr_conflict) {
2633 case ASB_CALL_HELPER:
2634 drbd_khelper(mdev, "pri-lost");
2635 /* fall through */
2636 case ASB_DISCONNECT:
2637 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2638 return C_MASK;
2639 case ASB_VIOLENTLY:
2640 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2641 "assumption\n");
2642 }
2643 }
2644
cf14c2e9
PR
2645 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2646 if (hg == 0)
2647 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2648 else
2649 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2650 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2651 abs(hg) >= 2 ? "full" : "bit-map based");
2652 return C_MASK;
2653 }
2654
b411b363
PR
2655 if (abs(hg) >= 2) {
2656 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2657 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2658 return C_MASK;
2659 }
2660
2661 if (hg > 0) { /* become sync source. */
2662 rv = C_WF_BITMAP_S;
2663 } else if (hg < 0) { /* become sync target */
2664 rv = C_WF_BITMAP_T;
2665 } else {
2666 rv = C_CONNECTED;
2667 if (drbd_bm_total_weight(mdev)) {
2668 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2669 drbd_bm_total_weight(mdev));
2670 }
2671 }
2672
2673 return rv;
2674}
2675
2676/* returns 1 if invalid */
2677static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2678{
2679 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2680 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2681 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2682 return 0;
2683
2684 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2685 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2686 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2687 return 1;
2688
2689 /* everything else is valid if they are equal on both sides. */
2690 if (peer == self)
2691 return 0;
2692
2693 /* everything es is invalid. */
2694 return 1;
2695}
2696
2697static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2698{
2699 struct p_protocol *p = (struct p_protocol *)h;
2700 int header_size, data_size;
2701 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2702 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2703 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2704
2705 header_size = sizeof(*p) - sizeof(*h);
2706 data_size = h->length - header_size;
2707
2708 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2709 return FALSE;
2710
2711 p_proto = be32_to_cpu(p->protocol);
2712 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2713 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2714 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2715 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2716 cf = be32_to_cpu(p->conn_flags);
2717 p_want_lose = cf & CF_WANT_LOSE;
2718
2719 clear_bit(CONN_DRY_RUN, &mdev->flags);
2720
2721 if (cf & CF_DRY_RUN)
2722 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363
PR
2723
2724 if (p_proto != mdev->net_conf->wire_protocol) {
2725 dev_err(DEV, "incompatible communication protocols\n");
2726 goto disconnect;
2727 }
2728
2729 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2730 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2731 goto disconnect;
2732 }
2733
2734 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2735 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2736 goto disconnect;
2737 }
2738
2739 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2740 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2741 goto disconnect;
2742 }
2743
2744 if (p_want_lose && mdev->net_conf->want_lose) {
2745 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2746 goto disconnect;
2747 }
2748
2749 if (p_two_primaries != mdev->net_conf->two_primaries) {
2750 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2751 goto disconnect;
2752 }
2753
2754 if (mdev->agreed_pro_version >= 87) {
2755 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2756
2757 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2758 return FALSE;
2759
2760 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2761 if (strcmp(p_integrity_alg, my_alg)) {
2762 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2763 goto disconnect;
2764 }
2765 dev_info(DEV, "data-integrity-alg: %s\n",
2766 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2767 }
2768
2769 return TRUE;
2770
2771disconnect:
2772 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2773 return FALSE;
2774}
2775
2776/* helper function
2777 * input: alg name, feature name
2778 * return: NULL (alg name was "")
2779 * ERR_PTR(error) if something goes wrong
2780 * or the crypto hash ptr, if it worked out ok. */
2781struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2782 const char *alg, const char *name)
2783{
2784 struct crypto_hash *tfm;
2785
2786 if (!alg[0])
2787 return NULL;
2788
2789 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2790 if (IS_ERR(tfm)) {
2791 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2792 alg, name, PTR_ERR(tfm));
2793 return tfm;
2794 }
2795 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2796 crypto_free_hash(tfm);
2797 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2798 return ERR_PTR(-EINVAL);
2799 }
2800 return tfm;
2801}
2802
2803static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2804{
2805 int ok = TRUE;
2806 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2807 unsigned int header_size, data_size, exp_max_sz;
2808 struct crypto_hash *verify_tfm = NULL;
2809 struct crypto_hash *csums_tfm = NULL;
2810 const int apv = mdev->agreed_pro_version;
2811
2812 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2813 : apv == 88 ? sizeof(struct p_rs_param)
2814 + SHARED_SECRET_MAX
2815 : /* 89 */ sizeof(struct p_rs_param_89);
2816
2817 if (h->length > exp_max_sz) {
2818 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2819 h->length, exp_max_sz);
2820 return FALSE;
2821 }
2822
2823 if (apv <= 88) {
2824 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2825 data_size = h->length - header_size;
2826 } else /* apv >= 89 */ {
2827 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2828 data_size = h->length - header_size;
2829 D_ASSERT(data_size == 0);
2830 }
2831
2832 /* initialize verify_alg and csums_alg */
2833 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2834
2835 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2836 return FALSE;
2837
2838 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2839
2840 if (apv >= 88) {
2841 if (apv == 88) {
2842 if (data_size > SHARED_SECRET_MAX) {
2843 dev_err(DEV, "verify-alg too long, "
2844 "peer wants %u, accepting only %u byte\n",
2845 data_size, SHARED_SECRET_MAX);
2846 return FALSE;
2847 }
2848
2849 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2850 return FALSE;
2851
2852 /* we expect NUL terminated string */
2853 /* but just in case someone tries to be evil */
2854 D_ASSERT(p->verify_alg[data_size-1] == 0);
2855 p->verify_alg[data_size-1] = 0;
2856
2857 } else /* apv >= 89 */ {
2858 /* we still expect NUL terminated strings */
2859 /* but just in case someone tries to be evil */
2860 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2861 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2862 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2863 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2864 }
2865
2866 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2867 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2868 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2869 mdev->sync_conf.verify_alg, p->verify_alg);
2870 goto disconnect;
2871 }
2872 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2873 p->verify_alg, "verify-alg");
2874 if (IS_ERR(verify_tfm)) {
2875 verify_tfm = NULL;
2876 goto disconnect;
2877 }
2878 }
2879
2880 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2881 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2882 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2883 mdev->sync_conf.csums_alg, p->csums_alg);
2884 goto disconnect;
2885 }
2886 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2887 p->csums_alg, "csums-alg");
2888 if (IS_ERR(csums_tfm)) {
2889 csums_tfm = NULL;
2890 goto disconnect;
2891 }
2892 }
2893
2894
2895 spin_lock(&mdev->peer_seq_lock);
2896 /* lock against drbd_nl_syncer_conf() */
2897 if (verify_tfm) {
2898 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2899 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2900 crypto_free_hash(mdev->verify_tfm);
2901 mdev->verify_tfm = verify_tfm;
2902 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2903 }
2904 if (csums_tfm) {
2905 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2906 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2907 crypto_free_hash(mdev->csums_tfm);
2908 mdev->csums_tfm = csums_tfm;
2909 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2910 }
2911 spin_unlock(&mdev->peer_seq_lock);
2912 }
2913
2914 return ok;
2915disconnect:
2916 /* just for completeness: actually not needed,
2917 * as this is not reached if csums_tfm was ok. */
2918 crypto_free_hash(csums_tfm);
2919 /* but free the verify_tfm again, if csums_tfm did not work out */
2920 crypto_free_hash(verify_tfm);
2921 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2922 return FALSE;
2923}
2924
2925static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2926{
2927 /* sorry, we currently have no working implementation
2928 * of distributed TCQ */
2929}
2930
2931/* warn if the arguments differ by more than 12.5% */
2932static void warn_if_differ_considerably(struct drbd_conf *mdev,
2933 const char *s, sector_t a, sector_t b)
2934{
2935 sector_t d;
2936 if (a == 0 || b == 0)
2937 return;
2938 d = (a > b) ? (a - b) : (b - a);
2939 if (d > (a>>3) || d > (b>>3))
2940 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2941 (unsigned long long)a, (unsigned long long)b);
2942}
2943
2944static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2945{
2946 struct p_sizes *p = (struct p_sizes *)h;
2947 enum determine_dev_size dd = unchanged;
2948 unsigned int max_seg_s;
2949 sector_t p_size, p_usize, my_usize;
2950 int ldsc = 0; /* local disk size changed */
e89b591c 2951 enum dds_flags ddsf;
b411b363
PR
2952
2953 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2954 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2955 return FALSE;
2956
2957 p_size = be64_to_cpu(p->d_size);
2958 p_usize = be64_to_cpu(p->u_size);
2959
2960 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2961 dev_err(DEV, "some backing storage is needed\n");
2962 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2963 return FALSE;
2964 }
2965
2966 /* just store the peer's disk size for now.
2967 * we still need to figure out whether we accept that. */
2968 mdev->p_size = p_size;
2969
2970#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2971 if (get_ldev(mdev)) {
2972 warn_if_differ_considerably(mdev, "lower level device sizes",
2973 p_size, drbd_get_max_capacity(mdev->ldev));
2974 warn_if_differ_considerably(mdev, "user requested size",
2975 p_usize, mdev->ldev->dc.disk_size);
2976
2977 /* if this is the first connect, or an otherwise expected
2978 * param exchange, choose the minimum */
2979 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2980 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2981 p_usize);
2982
2983 my_usize = mdev->ldev->dc.disk_size;
2984
2985 if (mdev->ldev->dc.disk_size != p_usize) {
2986 mdev->ldev->dc.disk_size = p_usize;
2987 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2988 (unsigned long)mdev->ldev->dc.disk_size);
2989 }
2990
2991 /* Never shrink a device with usable data during connect.
2992 But allow online shrinking if we are connected. */
a393db6f 2993 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
2994 drbd_get_capacity(mdev->this_bdev) &&
2995 mdev->state.disk >= D_OUTDATED &&
2996 mdev->state.conn < C_CONNECTED) {
2997 dev_err(DEV, "The peer's disk size is too small!\n");
2998 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2999 mdev->ldev->dc.disk_size = my_usize;
3000 put_ldev(mdev);
3001 return FALSE;
3002 }
3003 put_ldev(mdev);
3004 }
3005#undef min_not_zero
3006
e89b591c 3007 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3008 if (get_ldev(mdev)) {
e89b591c 3009 dd = drbd_determin_dev_size(mdev, ddsf);
b411b363
PR
3010 put_ldev(mdev);
3011 if (dd == dev_size_error)
3012 return FALSE;
3013 drbd_md_sync(mdev);
3014 } else {
3015 /* I am diskless, need to accept the peer's size. */
3016 drbd_set_my_capacity(mdev, p_size);
3017 }
3018
b411b363
PR
3019 if (get_ldev(mdev)) {
3020 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3021 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3022 ldsc = 1;
3023 }
3024
a1c88d0d
LE
3025 if (mdev->agreed_pro_version < 94)
3026 max_seg_s = be32_to_cpu(p->max_segment_size);
3027 else /* drbd 8.3.8 onwards */
3028 max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3029
b411b363
PR
3030 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3031 drbd_setup_queue_param(mdev, max_seg_s);
3032
e89b591c 3033 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
b411b363
PR
3034 put_ldev(mdev);
3035 }
3036
3037 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3038 if (be64_to_cpu(p->c_size) !=
3039 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3040 /* we have different sizes, probably peer
3041 * needs to know my new size... */
e89b591c 3042 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3043 }
3044 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3045 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3046 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3047 mdev->state.disk >= D_INCONSISTENT) {
3048 if (ddsf & DDSF_NO_RESYNC)
3049 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3050 else
3051 resync_after_online_grow(mdev);
3052 } else
b411b363
PR
3053 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3054 }
3055 }
3056
3057 return TRUE;
3058}
3059
3060static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3061{
3062 struct p_uuids *p = (struct p_uuids *)h;
3063 u64 *p_uuid;
3064 int i;
3065
3066 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3067 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3068 return FALSE;
3069
3070 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3071
3072 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3073 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3074
3075 kfree(mdev->p_uuid);
3076 mdev->p_uuid = p_uuid;
3077
3078 if (mdev->state.conn < C_CONNECTED &&
3079 mdev->state.disk < D_INCONSISTENT &&
3080 mdev->state.role == R_PRIMARY &&
3081 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3082 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3083 (unsigned long long)mdev->ed_uuid);
3084 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3085 return FALSE;
3086 }
3087
3088 if (get_ldev(mdev)) {
3089 int skip_initial_sync =
3090 mdev->state.conn == C_CONNECTED &&
3091 mdev->agreed_pro_version >= 90 &&
3092 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3093 (p_uuid[UI_FLAGS] & 8);
3094 if (skip_initial_sync) {
3095 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3096 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3097 "clear_n_write from receive_uuids");
3098 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3099 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3100 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3101 CS_VERBOSE, NULL);
3102 drbd_md_sync(mdev);
3103 }
3104 put_ldev(mdev);
3105 }
3106
3107 /* Before we test for the disk state, we should wait until an eventually
3108 ongoing cluster wide state change is finished. That is important if
3109 we are primary and are detaching from our disk. We need to see the
3110 new disk state... */
3111 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3112 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3113 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3114
3115 return TRUE;
3116}
3117
3118/**
3119 * convert_state() - Converts the peer's view of the cluster state to our point of view
3120 * @ps: The state as seen by the peer.
3121 */
3122static union drbd_state convert_state(union drbd_state ps)
3123{
3124 union drbd_state ms;
3125
3126 static enum drbd_conns c_tab[] = {
3127 [C_CONNECTED] = C_CONNECTED,
3128
3129 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3130 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3131 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3132 [C_VERIFY_S] = C_VERIFY_T,
3133 [C_MASK] = C_MASK,
3134 };
3135
3136 ms.i = ps.i;
3137
3138 ms.conn = c_tab[ps.conn];
3139 ms.peer = ps.role;
3140 ms.role = ps.peer;
3141 ms.pdsk = ps.disk;
3142 ms.disk = ps.pdsk;
3143 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3144
3145 return ms;
3146}
3147
3148static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3149{
3150 struct p_req_state *p = (struct p_req_state *)h;
3151 union drbd_state mask, val;
3152 int rv;
3153
3154 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3155 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3156 return FALSE;
3157
3158 mask.i = be32_to_cpu(p->mask);
3159 val.i = be32_to_cpu(p->val);
3160
3161 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3162 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3163 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3164 return TRUE;
3165 }
3166
3167 mask = convert_state(mask);
3168 val = convert_state(val);
3169
3170 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3171
3172 drbd_send_sr_reply(mdev, rv);
3173 drbd_md_sync(mdev);
3174
3175 return TRUE;
3176}
3177
3178static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3179{
3180 struct p_state *p = (struct p_state *)h;
3181 enum drbd_conns nconn, oconn;
3182 union drbd_state ns, peer_state;
3183 enum drbd_disk_state real_peer_disk;
3184 int rv;
3185
3186 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3187 return FALSE;
3188
3189 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3190 return FALSE;
3191
3192 peer_state.i = be32_to_cpu(p->state);
3193
3194 real_peer_disk = peer_state.disk;
3195 if (peer_state.disk == D_NEGOTIATING) {
3196 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3197 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3198 }
3199
3200 spin_lock_irq(&mdev->req_lock);
3201 retry:
3202 oconn = nconn = mdev->state.conn;
3203 spin_unlock_irq(&mdev->req_lock);
3204
3205 if (nconn == C_WF_REPORT_PARAMS)
3206 nconn = C_CONNECTED;
3207
3208 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3209 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3210 int cr; /* consider resync */
3211
3212 /* if we established a new connection */
3213 cr = (oconn < C_CONNECTED);
3214 /* if we had an established connection
3215 * and one of the nodes newly attaches a disk */
3216 cr |= (oconn == C_CONNECTED &&
3217 (peer_state.disk == D_NEGOTIATING ||
3218 mdev->state.disk == D_NEGOTIATING));
3219 /* if we have both been inconsistent, and the peer has been
3220 * forced to be UpToDate with --overwrite-data */
3221 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3222 /* if we had been plain connected, and the admin requested to
3223 * start a sync by "invalidate" or "invalidate-remote" */
3224 cr |= (oconn == C_CONNECTED &&
3225 (peer_state.conn >= C_STARTING_SYNC_S &&
3226 peer_state.conn <= C_WF_BITMAP_T));
3227
3228 if (cr)
3229 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3230
3231 put_ldev(mdev);
3232 if (nconn == C_MASK) {
580b9767 3233 nconn = C_CONNECTED;
b411b363
PR
3234 if (mdev->state.disk == D_NEGOTIATING) {
3235 drbd_force_state(mdev, NS(disk, D_DISKLESS));
b411b363
PR
3236 } else if (peer_state.disk == D_NEGOTIATING) {
3237 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3238 peer_state.disk = D_DISKLESS;
580b9767 3239 real_peer_disk = D_DISKLESS;
b411b363 3240 } else {
cf14c2e9
PR
3241 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3242 return FALSE;
b411b363
PR
3243 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3244 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3245 return FALSE;
3246 }
3247 }
3248 }
3249
3250 spin_lock_irq(&mdev->req_lock);
3251 if (mdev->state.conn != oconn)
3252 goto retry;
3253 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3254 ns.i = mdev->state.i;
3255 ns.conn = nconn;
3256 ns.peer = peer_state.role;
3257 ns.pdsk = real_peer_disk;
3258 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3259 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3260 ns.disk = mdev->new_state_tmp.disk;
3261
3262 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3263 ns = mdev->state;
3264 spin_unlock_irq(&mdev->req_lock);
3265
3266 if (rv < SS_SUCCESS) {
3267 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3268 return FALSE;
3269 }
3270
3271 if (oconn > C_WF_REPORT_PARAMS) {
3272 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3273 peer_state.disk != D_NEGOTIATING ) {
3274 /* we want resync, peer has not yet decided to sync... */
3275 /* Nowadays only used when forcing a node into primary role and
3276 setting its disk to UpToDate with that */
3277 drbd_send_uuids(mdev);
3278 drbd_send_state(mdev);
3279 }
3280 }
3281
3282 mdev->net_conf->want_lose = 0;
3283
3284 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3285
3286 return TRUE;
3287}
3288
3289static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3290{
3291 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3292
3293 wait_event(mdev->misc_wait,
3294 mdev->state.conn == C_WF_SYNC_UUID ||
3295 mdev->state.conn < C_CONNECTED ||
3296 mdev->state.disk < D_NEGOTIATING);
3297
3298 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3299
3300 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3301 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3302 return FALSE;
3303
3304 /* Here the _drbd_uuid_ functions are right, current should
3305 _not_ be rotated into the history */
3306 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3307 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3308 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3309
3310 drbd_start_resync(mdev, C_SYNC_TARGET);
3311
3312 put_ldev(mdev);
3313 } else
3314 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3315
3316 return TRUE;
3317}
3318
3319enum receive_bitmap_ret { OK, DONE, FAILED };
3320
3321static enum receive_bitmap_ret
3322receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3323 unsigned long *buffer, struct bm_xfer_ctx *c)
3324{
3325 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3326 unsigned want = num_words * sizeof(long);
3327
3328 if (want != h->length) {
3329 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3330 return FAILED;
3331 }
3332 if (want == 0)
3333 return DONE;
3334 if (drbd_recv(mdev, buffer, want) != want)
3335 return FAILED;
3336
3337 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3338
3339 c->word_offset += num_words;
3340 c->bit_offset = c->word_offset * BITS_PER_LONG;
3341 if (c->bit_offset > c->bm_bits)
3342 c->bit_offset = c->bm_bits;
3343
3344 return OK;
3345}
3346
3347static enum receive_bitmap_ret
3348recv_bm_rle_bits(struct drbd_conf *mdev,
3349 struct p_compressed_bm *p,
3350 struct bm_xfer_ctx *c)
3351{
3352 struct bitstream bs;
3353 u64 look_ahead;
3354 u64 rl;
3355 u64 tmp;
3356 unsigned long s = c->bit_offset;
3357 unsigned long e;
3358 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3359 int toggle = DCBP_get_start(p);
3360 int have;
3361 int bits;
3362
3363 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3364
3365 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3366 if (bits < 0)
3367 return FAILED;
3368
3369 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3370 bits = vli_decode_bits(&rl, look_ahead);
3371 if (bits <= 0)
3372 return FAILED;
3373
3374 if (toggle) {
3375 e = s + rl -1;
3376 if (e >= c->bm_bits) {
3377 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3378 return FAILED;
3379 }
3380 _drbd_bm_set_bits(mdev, s, e);
3381 }
3382
3383 if (have < bits) {
3384 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3385 have, bits, look_ahead,
3386 (unsigned int)(bs.cur.b - p->code),
3387 (unsigned int)bs.buf_len);
3388 return FAILED;
3389 }
3390 look_ahead >>= bits;
3391 have -= bits;
3392
3393 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3394 if (bits < 0)
3395 return FAILED;
3396 look_ahead |= tmp << have;
3397 have += bits;
3398 }
3399
3400 c->bit_offset = s;
3401 bm_xfer_ctx_bit_to_word_offset(c);
3402
3403 return (s == c->bm_bits) ? DONE : OK;
3404}
3405
3406static enum receive_bitmap_ret
3407decode_bitmap_c(struct drbd_conf *mdev,
3408 struct p_compressed_bm *p,
3409 struct bm_xfer_ctx *c)
3410{
3411 if (DCBP_get_code(p) == RLE_VLI_Bits)
3412 return recv_bm_rle_bits(mdev, p, c);
3413
3414 /* other variants had been implemented for evaluation,
3415 * but have been dropped as this one turned out to be "best"
3416 * during all our tests. */
3417
3418 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3419 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3420 return FAILED;
3421}
3422
3423void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3424 const char *direction, struct bm_xfer_ctx *c)
3425{
3426 /* what would it take to transfer it "plaintext" */
3427 unsigned plain = sizeof(struct p_header) *
3428 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3429 + c->bm_words * sizeof(long);
3430 unsigned total = c->bytes[0] + c->bytes[1];
3431 unsigned r;
3432
3433 /* total can not be zero. but just in case: */
3434 if (total == 0)
3435 return;
3436
3437 /* don't report if not compressed */
3438 if (total >= plain)
3439 return;
3440
3441 /* total < plain. check for overflow, still */
3442 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3443 : (1000 * total / plain);
3444
3445 if (r > 1000)
3446 r = 1000;
3447
3448 r = 1000 - r;
3449 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3450 "total %u; compression: %u.%u%%\n",
3451 direction,
3452 c->bytes[1], c->packets[1],
3453 c->bytes[0], c->packets[0],
3454 total, r/10, r % 10);
3455}
3456
3457/* Since we are processing the bitfield from lower addresses to higher,
3458 it does not matter if the process it in 32 bit chunks or 64 bit
3459 chunks as long as it is little endian. (Understand it as byte stream,
3460 beginning with the lowest byte...) If we would use big endian
3461 we would need to process it from the highest address to the lowest,
3462 in order to be agnostic to the 32 vs 64 bits issue.
3463
3464 returns 0 on failure, 1 if we successfully received it. */
3465static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3466{
3467 struct bm_xfer_ctx c;
3468 void *buffer;
3469 enum receive_bitmap_ret ret;
3470 int ok = FALSE;
3471
3472 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3473
3474 drbd_bm_lock(mdev, "receive bitmap");
3475
3476 /* maybe we should use some per thread scratch page,
3477 * and allocate that during initial device creation? */
3478 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3479 if (!buffer) {
3480 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3481 goto out;
3482 }
3483
3484 c = (struct bm_xfer_ctx) {
3485 .bm_bits = drbd_bm_bits(mdev),
3486 .bm_words = drbd_bm_words(mdev),
3487 };
3488
3489 do {
3490 if (h->command == P_BITMAP) {
3491 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3492 } else if (h->command == P_COMPRESSED_BITMAP) {
3493 /* MAYBE: sanity check that we speak proto >= 90,
3494 * and the feature is enabled! */
3495 struct p_compressed_bm *p;
3496
3497 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3498 dev_err(DEV, "ReportCBitmap packet too large\n");
3499 goto out;
3500 }
3501 /* use the page buff */
3502 p = buffer;
3503 memcpy(p, h, sizeof(*h));
3504 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3505 goto out;
3506 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3507 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3508 return FAILED;
3509 }
3510 ret = decode_bitmap_c(mdev, p, &c);
3511 } else {
3512 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3513 goto out;
3514 }
3515
3516 c.packets[h->command == P_BITMAP]++;
3517 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3518
3519 if (ret != OK)
3520 break;
3521
3522 if (!drbd_recv_header(mdev, h))
3523 goto out;
3524 } while (ret == OK);
3525 if (ret == FAILED)
3526 goto out;
3527
3528 INFO_bm_xfer_stats(mdev, "receive", &c);
3529
3530 if (mdev->state.conn == C_WF_BITMAP_T) {
3531 ok = !drbd_send_bitmap(mdev);
3532 if (!ok)
3533 goto out;
3534 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3535 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3536 D_ASSERT(ok == SS_SUCCESS);
3537 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3538 /* admin may have requested C_DISCONNECTING,
3539 * other threads may have noticed network errors */
3540 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3541 drbd_conn_str(mdev->state.conn));
3542 }
3543
3544 ok = TRUE;
3545 out:
3546 drbd_bm_unlock(mdev);
3547 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3548 drbd_start_resync(mdev, C_SYNC_SOURCE);
3549 free_page((unsigned long) buffer);
3550 return ok;
3551}
3552
3553static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3554{
3555 /* TODO zero copy sink :) */
3556 static char sink[128];
3557 int size, want, r;
3558
3559 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3560 h->command, h->length);
3561
3562 size = h->length;
3563 while (size > 0) {
3564 want = min_t(int, size, sizeof(sink));
3565 r = drbd_recv(mdev, sink, want);
3566 ERR_IF(r <= 0) break;
3567 size -= r;
3568 }
3569 return size == 0;
3570}
3571
3572static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3573{
3574 if (mdev->state.disk >= D_INCONSISTENT)
3575 drbd_kick_lo(mdev);
3576
3577 /* Make sure we've acked all the TCP data associated
3578 * with the data requests being unplugged */
3579 drbd_tcp_quickack(mdev->data.socket);
3580
3581 return TRUE;
3582}
3583
0ced55a3
PR
3584static void timeval_sub_us(struct timeval* tv, unsigned int us)
3585{
3586 tv->tv_sec -= us / 1000000;
3587 us = us % 1000000;
3588 if (tv->tv_usec > us) {
3589 tv->tv_usec += 1000000;
3590 tv->tv_sec--;
3591 }
3592 tv->tv_usec -= us;
3593}
3594
3595static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3596{
3597 struct delay_probe *dp;
3598 struct list_head *le;
3599 struct timeval now;
3600 int seq_num;
3601 int offset;
3602 int data_delay;
3603
3604 seq_num = be32_to_cpu(p->seq_num);
3605 offset = be32_to_cpu(p->offset);
3606
3607 spin_lock(&mdev->peer_seq_lock);
3608 if (!list_empty(&mdev->delay_probes)) {
3609 if (from == USE_DATA_SOCKET)
3610 le = mdev->delay_probes.next;
3611 else
3612 le = mdev->delay_probes.prev;
3613
3614 dp = list_entry(le, struct delay_probe, list);
3615
3616 if (dp->seq_num == seq_num) {
3617 list_del(le);
3618 spin_unlock(&mdev->peer_seq_lock);
3619 do_gettimeofday(&now);
3620 timeval_sub_us(&now, offset);
3621 data_delay =
3622 now.tv_usec - dp->time.tv_usec +
3623 (now.tv_sec - dp->time.tv_sec) * 1000000;
3624
3625 if (data_delay > 0)
3626 mdev->data_delay = data_delay;
3627
3628 kfree(dp);
3629 return;
3630 }
3631
3632 if (dp->seq_num > seq_num) {
3633 spin_unlock(&mdev->peer_seq_lock);
3634 dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3635 return; /* Do not alloca a struct delay_probe.... */
3636 }
3637 }
3638 spin_unlock(&mdev->peer_seq_lock);
3639
3640 dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3641 if (!dp) {
3642 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3643 return;
3644 }
3645
3646 dp->seq_num = seq_num;
3647 do_gettimeofday(&dp->time);
3648 timeval_sub_us(&dp->time, offset);
3649
3650 spin_lock(&mdev->peer_seq_lock);
3651 if (from == USE_DATA_SOCKET)
3652 list_add(&dp->list, &mdev->delay_probes);
3653 else
3654 list_add_tail(&dp->list, &mdev->delay_probes);
3655 spin_unlock(&mdev->peer_seq_lock);
3656}
3657
3658static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3659{
3660 struct p_delay_probe *p = (struct p_delay_probe *)h;
3661
3662 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3663 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3664 return FALSE;
3665
3666 got_delay_probe(mdev, USE_DATA_SOCKET, p);
3667 return TRUE;
3668}
3669
b411b363
PR
3670typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3671
3672static drbd_cmd_handler_f drbd_default_handler[] = {
3673 [P_DATA] = receive_Data,
3674 [P_DATA_REPLY] = receive_DataReply,
3675 [P_RS_DATA_REPLY] = receive_RSDataReply,
3676 [P_BARRIER] = receive_Barrier,
3677 [P_BITMAP] = receive_bitmap,
3678 [P_COMPRESSED_BITMAP] = receive_bitmap,
3679 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3680 [P_DATA_REQUEST] = receive_DataRequest,
3681 [P_RS_DATA_REQUEST] = receive_DataRequest,
3682 [P_SYNC_PARAM] = receive_SyncParam,
3683 [P_SYNC_PARAM89] = receive_SyncParam,
3684 [P_PROTOCOL] = receive_protocol,
3685 [P_UUIDS] = receive_uuids,
3686 [P_SIZES] = receive_sizes,
3687 [P_STATE] = receive_state,
3688 [P_STATE_CHG_REQ] = receive_req_state,
3689 [P_SYNC_UUID] = receive_sync_uuid,
3690 [P_OV_REQUEST] = receive_DataRequest,
3691 [P_OV_REPLY] = receive_DataRequest,
3692 [P_CSUM_RS_REQUEST] = receive_DataRequest,
0ced55a3 3693 [P_DELAY_PROBE] = receive_delay_probe,
b411b363
PR
3694 /* anything missing from this table is in
3695 * the asender_tbl, see get_asender_cmd */
3696 [P_MAX_CMD] = NULL,
3697};
3698
3699static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3700static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3701
3702static void drbdd(struct drbd_conf *mdev)
3703{
3704 drbd_cmd_handler_f handler;
3705 struct p_header *header = &mdev->data.rbuf.header;
3706
3707 while (get_t_state(&mdev->receiver) == Running) {
3708 drbd_thread_current_set_cpu(mdev);
0b33a916
LE
3709 if (!drbd_recv_header(mdev, header)) {
3710 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
b411b363 3711 break;
0b33a916 3712 }
b411b363
PR
3713
3714 if (header->command < P_MAX_CMD)
3715 handler = drbd_cmd_handler[header->command];
3716 else if (P_MAY_IGNORE < header->command
3717 && header->command < P_MAX_OPT_CMD)
3718 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3719 else if (header->command > P_MAX_OPT_CMD)
3720 handler = receive_skip;
3721 else
3722 handler = NULL;
3723
3724 if (unlikely(!handler)) {
3725 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3726 header->command, header->length);
3727 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3728 break;
3729 }
3730 if (unlikely(!handler(mdev, header))) {
3731 dev_err(DEV, "error receiving %s, l: %d!\n",
3732 cmdname(header->command), header->length);
3733 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3734 break;
3735 }
b411b363
PR
3736 }
3737}
3738
3739static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3740{
3741 struct hlist_head *slot;
3742 struct hlist_node *pos;
3743 struct hlist_node *tmp;
3744 struct drbd_request *req;
3745 int i;
3746
3747 /*
3748 * Application READ requests
3749 */
3750 spin_lock_irq(&mdev->req_lock);
3751 for (i = 0; i < APP_R_HSIZE; i++) {
3752 slot = mdev->app_reads_hash+i;
3753 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3754 /* it may (but should not any longer!)
3755 * be on the work queue; if that assert triggers,
3756 * we need to also grab the
3757 * spin_lock_irq(&mdev->data.work.q_lock);
3758 * and list_del_init here. */
3759 D_ASSERT(list_empty(&req->w.list));
3760 /* It would be nice to complete outside of spinlock.
3761 * But this is easier for now. */
3762 _req_mod(req, connection_lost_while_pending);
3763 }
3764 }
3765 for (i = 0; i < APP_R_HSIZE; i++)
3766 if (!hlist_empty(mdev->app_reads_hash+i))
3767 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3768 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3769
3770 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3771 spin_unlock_irq(&mdev->req_lock);
3772}
3773
3774void drbd_flush_workqueue(struct drbd_conf *mdev)
3775{
3776 struct drbd_wq_barrier barr;
3777
3778 barr.w.cb = w_prev_work_done;
3779 init_completion(&barr.done);
3780 drbd_queue_work(&mdev->data.work, &barr.w);
3781 wait_for_completion(&barr.done);
3782}
3783
3784static void drbd_disconnect(struct drbd_conf *mdev)
3785{
3786 enum drbd_fencing_p fp;
3787 union drbd_state os, ns;
3788 int rv = SS_UNKNOWN_ERROR;
3789 unsigned int i;
3790
3791 if (mdev->state.conn == C_STANDALONE)
3792 return;
3793 if (mdev->state.conn >= C_WF_CONNECTION)
3794 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3795 drbd_conn_str(mdev->state.conn));
3796
3797 /* asender does not clean up anything. it must not interfere, either */
3798 drbd_thread_stop(&mdev->asender);
b411b363 3799 drbd_free_sock(mdev);
b411b363
PR
3800
3801 spin_lock_irq(&mdev->req_lock);
3802 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3803 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3804 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3805 spin_unlock_irq(&mdev->req_lock);
3806
3807 /* We do not have data structures that would allow us to
3808 * get the rs_pending_cnt down to 0 again.
3809 * * On C_SYNC_TARGET we do not have any data structures describing
3810 * the pending RSDataRequest's we have sent.
3811 * * On C_SYNC_SOURCE there is no data structure that tracks
3812 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3813 * And no, it is not the sum of the reference counts in the
3814 * resync_LRU. The resync_LRU tracks the whole operation including
3815 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3816 * on the fly. */
3817 drbd_rs_cancel_all(mdev);
3818 mdev->rs_total = 0;
3819 mdev->rs_failed = 0;
3820 atomic_set(&mdev->rs_pending_cnt, 0);
3821 wake_up(&mdev->misc_wait);
3822
3823 /* make sure syncer is stopped and w_resume_next_sg queued */
3824 del_timer_sync(&mdev->resync_timer);
3825 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3826 resync_timer_fn((unsigned long)mdev);
3827
b411b363
PR
3828 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3829 * w_make_resync_request etc. which may still be on the worker queue
3830 * to be "canceled" */
3831 drbd_flush_workqueue(mdev);
3832
3833 /* This also does reclaim_net_ee(). If we do this too early, we might
3834 * miss some resync ee and pages.*/
3835 drbd_process_done_ee(mdev);
3836
3837 kfree(mdev->p_uuid);
3838 mdev->p_uuid = NULL;
3839
3840 if (!mdev->state.susp)
3841 tl_clear(mdev);
3842
3843 drbd_fail_pending_reads(mdev);
3844
3845 dev_info(DEV, "Connection closed\n");
3846
3847 drbd_md_sync(mdev);
3848
3849 fp = FP_DONT_CARE;
3850 if (get_ldev(mdev)) {
3851 fp = mdev->ldev->dc.fencing;
3852 put_ldev(mdev);
3853 }
3854
3855 if (mdev->state.role == R_PRIMARY) {
3856 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3857 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3858 drbd_request_state(mdev, NS(pdsk, nps));
3859 }
3860 }
3861
3862 spin_lock_irq(&mdev->req_lock);
3863 os = mdev->state;
3864 if (os.conn >= C_UNCONNECTED) {
3865 /* Do not restart in case we are C_DISCONNECTING */
3866 ns = os;
3867 ns.conn = C_UNCONNECTED;
3868 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3869 }
3870 spin_unlock_irq(&mdev->req_lock);
3871
3872 if (os.conn == C_DISCONNECTING) {
3873 struct hlist_head *h;
3874 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3875
3876 /* we must not free the tl_hash
3877 * while application io is still on the fly */
3878 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3879
3880 spin_lock_irq(&mdev->req_lock);
3881 /* paranoia code */
3882 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3883 if (h->first)
3884 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3885 (int)(h - mdev->ee_hash), h->first);
3886 kfree(mdev->ee_hash);
3887 mdev->ee_hash = NULL;
3888 mdev->ee_hash_s = 0;
3889
3890 /* paranoia code */
3891 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3892 if (h->first)
3893 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3894 (int)(h - mdev->tl_hash), h->first);
3895 kfree(mdev->tl_hash);
3896 mdev->tl_hash = NULL;
3897 mdev->tl_hash_s = 0;
3898 spin_unlock_irq(&mdev->req_lock);
3899
3900 crypto_free_hash(mdev->cram_hmac_tfm);
3901 mdev->cram_hmac_tfm = NULL;
3902
3903 kfree(mdev->net_conf);
3904 mdev->net_conf = NULL;
3905 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3906 }
3907
3908 /* tcp_close and release of sendpage pages can be deferred. I don't
3909 * want to use SO_LINGER, because apparently it can be deferred for
3910 * more than 20 seconds (longest time I checked).
3911 *
3912 * Actually we don't care for exactly when the network stack does its
3913 * put_page(), but release our reference on these pages right here.
3914 */
3915 i = drbd_release_ee(mdev, &mdev->net_ee);
3916 if (i)
3917 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3918 i = atomic_read(&mdev->pp_in_use);
3919 if (i)
45bb912b 3920 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3921
3922 D_ASSERT(list_empty(&mdev->read_ee));
3923 D_ASSERT(list_empty(&mdev->active_ee));
3924 D_ASSERT(list_empty(&mdev->sync_ee));
3925 D_ASSERT(list_empty(&mdev->done_ee));
3926
3927 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3928 atomic_set(&mdev->current_epoch->epoch_size, 0);
3929 D_ASSERT(list_empty(&mdev->current_epoch->list));
3930}
3931
3932/*
3933 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3934 * we can agree on is stored in agreed_pro_version.
3935 *
3936 * feature flags and the reserved array should be enough room for future
3937 * enhancements of the handshake protocol, and possible plugins...
3938 *
3939 * for now, they are expected to be zero, but ignored.
3940 */
3941static int drbd_send_handshake(struct drbd_conf *mdev)
3942{
3943 /* ASSERT current == mdev->receiver ... */
3944 struct p_handshake *p = &mdev->data.sbuf.handshake;
3945 int ok;
3946
3947 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3948 dev_err(DEV, "interrupted during initial handshake\n");
3949 return 0; /* interrupted. not ok. */
3950 }
3951
3952 if (mdev->data.socket == NULL) {
3953 mutex_unlock(&mdev->data.mutex);
3954 return 0;
3955 }
3956
3957 memset(p, 0, sizeof(*p));
3958 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3959 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3960 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3961 (struct p_header *)p, sizeof(*p), 0 );
3962 mutex_unlock(&mdev->data.mutex);
3963 return ok;
3964}
3965
3966/*
3967 * return values:
3968 * 1 yes, we have a valid connection
3969 * 0 oops, did not work out, please try again
3970 * -1 peer talks different language,
3971 * no point in trying again, please go standalone.
3972 */
3973static int drbd_do_handshake(struct drbd_conf *mdev)
3974{
3975 /* ASSERT current == mdev->receiver ... */
3976 struct p_handshake *p = &mdev->data.rbuf.handshake;
3977 const int expect = sizeof(struct p_handshake)
3978 -sizeof(struct p_header);
3979 int rv;
3980
3981 rv = drbd_send_handshake(mdev);
3982 if (!rv)
3983 return 0;
3984
3985 rv = drbd_recv_header(mdev, &p->head);
3986 if (!rv)
3987 return 0;
3988
3989 if (p->head.command != P_HAND_SHAKE) {
3990 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3991 cmdname(p->head.command), p->head.command);
3992 return -1;
3993 }
3994
3995 if (p->head.length != expect) {
3996 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3997 expect, p->head.length);
3998 return -1;
3999 }
4000
4001 rv = drbd_recv(mdev, &p->head.payload, expect);
4002
4003 if (rv != expect) {
4004 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
4005 return 0;
4006 }
4007
b411b363
PR
4008 p->protocol_min = be32_to_cpu(p->protocol_min);
4009 p->protocol_max = be32_to_cpu(p->protocol_max);
4010 if (p->protocol_max == 0)
4011 p->protocol_max = p->protocol_min;
4012
4013 if (PRO_VERSION_MAX < p->protocol_min ||
4014 PRO_VERSION_MIN > p->protocol_max)
4015 goto incompat;
4016
4017 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4018
4019 dev_info(DEV, "Handshake successful: "
4020 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4021
4022 return 1;
4023
4024 incompat:
4025 dev_err(DEV, "incompatible DRBD dialects: "
4026 "I support %d-%d, peer supports %d-%d\n",
4027 PRO_VERSION_MIN, PRO_VERSION_MAX,
4028 p->protocol_min, p->protocol_max);
4029 return -1;
4030}
4031
4032#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4033static int drbd_do_auth(struct drbd_conf *mdev)
4034{
4035 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4036 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4037 return -1;
b411b363
PR
4038}
4039#else
4040#define CHALLENGE_LEN 64
b10d96cb
JT
4041
4042/* Return value:
4043 1 - auth succeeded,
4044 0 - failed, try again (network error),
4045 -1 - auth failed, don't try again.
4046*/
4047
b411b363
PR
4048static int drbd_do_auth(struct drbd_conf *mdev)
4049{
4050 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4051 struct scatterlist sg;
4052 char *response = NULL;
4053 char *right_response = NULL;
4054 char *peers_ch = NULL;
4055 struct p_header p;
4056 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4057 unsigned int resp_size;
4058 struct hash_desc desc;
4059 int rv;
4060
4061 desc.tfm = mdev->cram_hmac_tfm;
4062 desc.flags = 0;
4063
4064 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4065 (u8 *)mdev->net_conf->shared_secret, key_len);
4066 if (rv) {
4067 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4068 rv = -1;
b411b363
PR
4069 goto fail;
4070 }
4071
4072 get_random_bytes(my_challenge, CHALLENGE_LEN);
4073
4074 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4075 if (!rv)
4076 goto fail;
4077
4078 rv = drbd_recv_header(mdev, &p);
4079 if (!rv)
4080 goto fail;
4081
4082 if (p.command != P_AUTH_CHALLENGE) {
4083 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4084 cmdname(p.command), p.command);
4085 rv = 0;
4086 goto fail;
4087 }
4088
4089 if (p.length > CHALLENGE_LEN*2) {
4090 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4091 rv = -1;
b411b363
PR
4092 goto fail;
4093 }
4094
4095 peers_ch = kmalloc(p.length, GFP_NOIO);
4096 if (peers_ch == NULL) {
4097 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4098 rv = -1;
b411b363
PR
4099 goto fail;
4100 }
4101
4102 rv = drbd_recv(mdev, peers_ch, p.length);
4103
4104 if (rv != p.length) {
4105 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4106 rv = 0;
4107 goto fail;
4108 }
4109
4110 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4111 response = kmalloc(resp_size, GFP_NOIO);
4112 if (response == NULL) {
4113 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4114 rv = -1;
b411b363
PR
4115 goto fail;
4116 }
4117
4118 sg_init_table(&sg, 1);
4119 sg_set_buf(&sg, peers_ch, p.length);
4120
4121 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4122 if (rv) {
4123 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4124 rv = -1;
b411b363
PR
4125 goto fail;
4126 }
4127
4128 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4129 if (!rv)
4130 goto fail;
4131
4132 rv = drbd_recv_header(mdev, &p);
4133 if (!rv)
4134 goto fail;
4135
4136 if (p.command != P_AUTH_RESPONSE) {
4137 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4138 cmdname(p.command), p.command);
4139 rv = 0;
4140 goto fail;
4141 }
4142
4143 if (p.length != resp_size) {
4144 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4145 rv = 0;
4146 goto fail;
4147 }
4148
4149 rv = drbd_recv(mdev, response , resp_size);
4150
4151 if (rv != resp_size) {
4152 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4153 rv = 0;
4154 goto fail;
4155 }
4156
4157 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4158 if (right_response == NULL) {
b411b363 4159 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4160 rv = -1;
b411b363
PR
4161 goto fail;
4162 }
4163
4164 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4165
4166 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4167 if (rv) {
4168 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4169 rv = -1;
b411b363
PR
4170 goto fail;
4171 }
4172
4173 rv = !memcmp(response, right_response, resp_size);
4174
4175 if (rv)
4176 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4177 resp_size, mdev->net_conf->cram_hmac_alg);
b10d96cb
JT
4178 else
4179 rv = -1;
b411b363
PR
4180
4181 fail:
4182 kfree(peers_ch);
4183 kfree(response);
4184 kfree(right_response);
4185
4186 return rv;
4187}
4188#endif
4189
4190int drbdd_init(struct drbd_thread *thi)
4191{
4192 struct drbd_conf *mdev = thi->mdev;
4193 unsigned int minor = mdev_to_minor(mdev);
4194 int h;
4195
4196 sprintf(current->comm, "drbd%d_receiver", minor);
4197
4198 dev_info(DEV, "receiver (re)started\n");
4199
4200 do {
4201 h = drbd_connect(mdev);
4202 if (h == 0) {
4203 drbd_disconnect(mdev);
4204 __set_current_state(TASK_INTERRUPTIBLE);
4205 schedule_timeout(HZ);
4206 }
4207 if (h == -1) {
4208 dev_warn(DEV, "Discarding network configuration.\n");
4209 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4210 }
4211 } while (h == 0);
4212
4213 if (h > 0) {
4214 if (get_net_conf(mdev)) {
4215 drbdd(mdev);
4216 put_net_conf(mdev);
4217 }
4218 }
4219
4220 drbd_disconnect(mdev);
4221
4222 dev_info(DEV, "receiver terminated\n");
4223 return 0;
4224}
4225
4226/* ********* acknowledge sender ******** */
4227
4228static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4229{
4230 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4231
4232 int retcode = be32_to_cpu(p->retcode);
4233
4234 if (retcode >= SS_SUCCESS) {
4235 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4236 } else {
4237 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4238 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4239 drbd_set_st_err_str(retcode), retcode);
4240 }
4241 wake_up(&mdev->state_wait);
4242
4243 return TRUE;
4244}
4245
4246static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4247{
4248 return drbd_send_ping_ack(mdev);
4249
4250}
4251
4252static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4253{
4254 /* restore idle timeout */
4255 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
309d1608
PR
4256 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4257 wake_up(&mdev->misc_wait);
b411b363
PR
4258
4259 return TRUE;
4260}
4261
4262static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4263{
4264 struct p_block_ack *p = (struct p_block_ack *)h;
4265 sector_t sector = be64_to_cpu(p->sector);
4266 int blksize = be32_to_cpu(p->blksize);
4267
4268 D_ASSERT(mdev->agreed_pro_version >= 89);
4269
4270 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4271
4272 drbd_rs_complete_io(mdev, sector);
4273 drbd_set_in_sync(mdev, sector, blksize);
4274 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4275 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4276 dec_rs_pending(mdev);
4277
4278 return TRUE;
4279}
4280
4281/* when we receive the ACK for a write request,
4282 * verify that we actually know about it */
4283static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4284 u64 id, sector_t sector)
4285{
4286 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4287 struct hlist_node *n;
4288 struct drbd_request *req;
4289
4290 hlist_for_each_entry(req, n, slot, colision) {
4291 if ((unsigned long)req == (unsigned long)id) {
4292 if (req->sector != sector) {
4293 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4294 "wrong sector (%llus versus %llus)\n", req,
4295 (unsigned long long)req->sector,
4296 (unsigned long long)sector);
4297 break;
4298 }
4299 return req;
4300 }
4301 }
4302 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4303 (void *)(unsigned long)id, (unsigned long long)sector);
4304 return NULL;
4305}
4306
4307typedef struct drbd_request *(req_validator_fn)
4308 (struct drbd_conf *mdev, u64 id, sector_t sector);
4309
4310static int validate_req_change_req_state(struct drbd_conf *mdev,
4311 u64 id, sector_t sector, req_validator_fn validator,
4312 const char *func, enum drbd_req_event what)
4313{
4314 struct drbd_request *req;
4315 struct bio_and_error m;
4316
4317 spin_lock_irq(&mdev->req_lock);
4318 req = validator(mdev, id, sector);
4319 if (unlikely(!req)) {
4320 spin_unlock_irq(&mdev->req_lock);
4321 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4322 return FALSE;
4323 }
4324 __req_mod(req, what, &m);
4325 spin_unlock_irq(&mdev->req_lock);
4326
4327 if (m.bio)
4328 complete_master_bio(mdev, &m);
4329 return TRUE;
4330}
4331
4332static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4333{
4334 struct p_block_ack *p = (struct p_block_ack *)h;
4335 sector_t sector = be64_to_cpu(p->sector);
4336 int blksize = be32_to_cpu(p->blksize);
4337 enum drbd_req_event what;
4338
4339 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4340
4341 if (is_syncer_block_id(p->block_id)) {
4342 drbd_set_in_sync(mdev, sector, blksize);
4343 dec_rs_pending(mdev);
4344 return TRUE;
4345 }
4346 switch (be16_to_cpu(h->command)) {
4347 case P_RS_WRITE_ACK:
4348 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4349 what = write_acked_by_peer_and_sis;
4350 break;
4351 case P_WRITE_ACK:
4352 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4353 what = write_acked_by_peer;
4354 break;
4355 case P_RECV_ACK:
4356 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4357 what = recv_acked_by_peer;
4358 break;
4359 case P_DISCARD_ACK:
4360 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4361 what = conflict_discarded_by_peer;
4362 break;
4363 default:
4364 D_ASSERT(0);
4365 return FALSE;
4366 }
4367
4368 return validate_req_change_req_state(mdev, p->block_id, sector,
4369 _ack_id_to_req, __func__ , what);
4370}
4371
4372static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4373{
4374 struct p_block_ack *p = (struct p_block_ack *)h;
4375 sector_t sector = be64_to_cpu(p->sector);
4376
4377 if (__ratelimit(&drbd_ratelimit_state))
4378 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4379
4380 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4381
4382 if (is_syncer_block_id(p->block_id)) {
4383 int size = be32_to_cpu(p->blksize);
4384 dec_rs_pending(mdev);
4385 drbd_rs_failed_io(mdev, sector, size);
4386 return TRUE;
4387 }
4388 return validate_req_change_req_state(mdev, p->block_id, sector,
4389 _ack_id_to_req, __func__ , neg_acked);
4390}
4391
4392static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4393{
4394 struct p_block_ack *p = (struct p_block_ack *)h;
4395 sector_t sector = be64_to_cpu(p->sector);
4396
4397 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4398 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4399 (unsigned long long)sector, be32_to_cpu(p->blksize));
4400
4401 return validate_req_change_req_state(mdev, p->block_id, sector,
4402 _ar_id_to_req, __func__ , neg_acked);
4403}
4404
4405static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4406{
4407 sector_t sector;
4408 int size;
4409 struct p_block_ack *p = (struct p_block_ack *)h;
4410
4411 sector = be64_to_cpu(p->sector);
4412 size = be32_to_cpu(p->blksize);
b411b363
PR
4413
4414 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4415
4416 dec_rs_pending(mdev);
4417
4418 if (get_ldev_if_state(mdev, D_FAILED)) {
4419 drbd_rs_complete_io(mdev, sector);
4420 drbd_rs_failed_io(mdev, sector, size);
4421 put_ldev(mdev);
4422 }
4423
4424 return TRUE;
4425}
4426
4427static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4428{
4429 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4430
4431 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4432
4433 return TRUE;
4434}
4435
4436static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4437{
4438 struct p_block_ack *p = (struct p_block_ack *)h;
4439 struct drbd_work *w;
4440 sector_t sector;
4441 int size;
4442
4443 sector = be64_to_cpu(p->sector);
4444 size = be32_to_cpu(p->blksize);
4445
4446 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4447
4448 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4449 drbd_ov_oos_found(mdev, sector, size);
4450 else
4451 ov_oos_print(mdev);
4452
4453 drbd_rs_complete_io(mdev, sector);
4454 dec_rs_pending(mdev);
4455
4456 if (--mdev->ov_left == 0) {
4457 w = kmalloc(sizeof(*w), GFP_NOIO);
4458 if (w) {
4459 w->cb = w_ov_finished;
4460 drbd_queue_work_front(&mdev->data.work, w);
4461 } else {
4462 dev_err(DEV, "kmalloc(w) failed.");
4463 ov_oos_print(mdev);
4464 drbd_resync_finished(mdev);
4465 }
4466 }
4467 return TRUE;
4468}
4469
0ced55a3
PR
4470static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4471{
4472 struct p_delay_probe *p = (struct p_delay_probe *)h;
4473
4474 got_delay_probe(mdev, USE_META_SOCKET, p);
4475 return TRUE;
4476}
4477
b411b363
PR
4478struct asender_cmd {
4479 size_t pkt_size;
4480 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4481};
4482
4483static struct asender_cmd *get_asender_cmd(int cmd)
4484{
4485 static struct asender_cmd asender_tbl[] = {
4486 /* anything missing from this table is in
4487 * the drbd_cmd_handler (drbd_default_handler) table,
4488 * see the beginning of drbdd() */
4489 [P_PING] = { sizeof(struct p_header), got_Ping },
4490 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4491 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4492 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4493 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4494 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4495 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4496 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4497 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4498 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4499 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4500 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4501 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
0ced55a3 4502 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_delay_probe_m },
b411b363
PR
4503 [P_MAX_CMD] = { 0, NULL },
4504 };
4505 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4506 return NULL;
4507 return &asender_tbl[cmd];
4508}
4509
4510int drbd_asender(struct drbd_thread *thi)
4511{
4512 struct drbd_conf *mdev = thi->mdev;
4513 struct p_header *h = &mdev->meta.rbuf.header;
4514 struct asender_cmd *cmd = NULL;
4515
4516 int rv, len;
4517 void *buf = h;
4518 int received = 0;
4519 int expect = sizeof(struct p_header);
4520 int empty;
4521
4522 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4523
4524 current->policy = SCHED_RR; /* Make this a realtime task! */
4525 current->rt_priority = 2; /* more important than all other tasks */
4526
4527 while (get_t_state(thi) == Running) {
4528 drbd_thread_current_set_cpu(mdev);
4529 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4530 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4531 mdev->meta.socket->sk->sk_rcvtimeo =
4532 mdev->net_conf->ping_timeo*HZ/10;
4533 }
4534
4535 /* conditionally cork;
4536 * it may hurt latency if we cork without much to send */
4537 if (!mdev->net_conf->no_cork &&
4538 3 < atomic_read(&mdev->unacked_cnt))
4539 drbd_tcp_cork(mdev->meta.socket);
4540 while (1) {
4541 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4542 flush_signals(current);
4543 if (!drbd_process_done_ee(mdev)) {
4544 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4545 goto reconnect;
4546 }
4547 /* to avoid race with newly queued ACKs */
4548 set_bit(SIGNAL_ASENDER, &mdev->flags);
4549 spin_lock_irq(&mdev->req_lock);
4550 empty = list_empty(&mdev->done_ee);
4551 spin_unlock_irq(&mdev->req_lock);
4552 /* new ack may have been queued right here,
4553 * but then there is also a signal pending,
4554 * and we start over... */
4555 if (empty)
4556 break;
4557 }
4558 /* but unconditionally uncork unless disabled */
4559 if (!mdev->net_conf->no_cork)
4560 drbd_tcp_uncork(mdev->meta.socket);
4561
4562 /* short circuit, recv_msg would return EINTR anyways. */
4563 if (signal_pending(current))
4564 continue;
4565
4566 rv = drbd_recv_short(mdev, mdev->meta.socket,
4567 buf, expect-received, 0);
4568 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4569
4570 flush_signals(current);
4571
4572 /* Note:
4573 * -EINTR (on meta) we got a signal
4574 * -EAGAIN (on meta) rcvtimeo expired
4575 * -ECONNRESET other side closed the connection
4576 * -ERESTARTSYS (on data) we got a signal
4577 * rv < 0 other than above: unexpected error!
4578 * rv == expected: full header or command
4579 * rv < expected: "woken" by signal during receive
4580 * rv == 0 : "connection shut down by peer"
4581 */
4582 if (likely(rv > 0)) {
4583 received += rv;
4584 buf += rv;
4585 } else if (rv == 0) {
4586 dev_err(DEV, "meta connection shut down by peer.\n");
4587 goto reconnect;
4588 } else if (rv == -EAGAIN) {
4589 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4590 mdev->net_conf->ping_timeo*HZ/10) {
4591 dev_err(DEV, "PingAck did not arrive in time.\n");
4592 goto reconnect;
4593 }
4594 set_bit(SEND_PING, &mdev->flags);
4595 continue;
4596 } else if (rv == -EINTR) {
4597 continue;
4598 } else {
4599 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4600 goto reconnect;
4601 }
4602
4603 if (received == expect && cmd == NULL) {
4604 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4605 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4606 (long)be32_to_cpu(h->magic),
4607 h->command, h->length);
4608 goto reconnect;
4609 }
4610 cmd = get_asender_cmd(be16_to_cpu(h->command));
4611 len = be16_to_cpu(h->length);
4612 if (unlikely(cmd == NULL)) {
4613 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4614 (long)be32_to_cpu(h->magic),
4615 h->command, h->length);
4616 goto disconnect;
4617 }
4618 expect = cmd->pkt_size;
6a0afdf5 4619 ERR_IF(len != expect-sizeof(struct p_header))
b411b363 4620 goto reconnect;
b411b363
PR
4621 }
4622 if (received == expect) {
4623 D_ASSERT(cmd != NULL);
b411b363
PR
4624 if (!cmd->process(mdev, h))
4625 goto reconnect;
4626
4627 buf = h;
4628 received = 0;
4629 expect = sizeof(struct p_header);
4630 cmd = NULL;
4631 }
4632 }
4633
4634 if (0) {
4635reconnect:
4636 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4637 }
4638 if (0) {
4639disconnect:
4640 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4641 }
4642 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4643
4644 D_ASSERT(mdev->state.conn < C_CONNECTED);
4645 dev_info(DEV, "asender terminated\n");
4646
4647 return 0;
4648}