]> bbs.cooldavid.org Git - net-next-2.6.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: improve network latency, TCP_QUICKACK
[net-next-2.6.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
b411b363
PR
49#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
45bb912b
LE
83/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
23ce4227
PR
101
102 if (!page)
103 return NULL;
104
45bb912b
LE
105 while (page) {
106 tmp = page_chain_next(page);
107 if (--n == 0)
108 break; /* found sufficient pages */
109 if (tmp == NULL)
110 /* insufficient pages, don't use any of them. */
111 return NULL;
112 page = tmp;
113 }
114
115 /* add end of list marker for the returned list */
116 set_page_private(page, 0);
117 /* actual return value, and adjustment of head */
118 page = *head;
119 *head = tmp;
120 return page;
121}
122
123/* may be used outside of locks to find the tail of a (usually short)
124 * "private" page chain, before adding it back to a global chain head
125 * with page_chain_add() under a spinlock. */
126static struct page *page_chain_tail(struct page *page, int *len)
127{
128 struct page *tmp;
129 int i = 1;
130 while ((tmp = page_chain_next(page)))
131 ++i, page = tmp;
132 if (len)
133 *len = i;
134 return page;
135}
136
137static int page_chain_free(struct page *page)
138{
139 struct page *tmp;
140 int i = 0;
141 page_chain_for_each_safe(page, tmp) {
142 put_page(page);
143 ++i;
144 }
145 return i;
146}
147
148static void page_chain_add(struct page **head,
149 struct page *chain_first, struct page *chain_last)
150{
151#if 1
152 struct page *tmp;
153 tmp = page_chain_tail(chain_first, NULL);
154 BUG_ON(tmp != chain_last);
155#endif
156
157 /* add chain to head */
158 set_page_private(chain_last, (unsigned long)*head);
159 *head = chain_first;
160}
161
162static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
b411b363
PR
163{
164 struct page *page = NULL;
45bb912b
LE
165 struct page *tmp = NULL;
166 int i = 0;
b411b363
PR
167
168 /* Yes, testing drbd_pp_vacant outside the lock is racy.
169 * So what. It saves a spin_lock. */
45bb912b 170 if (drbd_pp_vacant >= number) {
b411b363 171 spin_lock(&drbd_pp_lock);
45bb912b
LE
172 page = page_chain_del(&drbd_pp_pool, number);
173 if (page)
174 drbd_pp_vacant -= number;
b411b363 175 spin_unlock(&drbd_pp_lock);
45bb912b
LE
176 if (page)
177 return page;
b411b363 178 }
45bb912b 179
b411b363
PR
180 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
181 * "criss-cross" setup, that might cause write-out on some other DRBD,
182 * which in turn might block on the other node at this very place. */
45bb912b
LE
183 for (i = 0; i < number; i++) {
184 tmp = alloc_page(GFP_TRY);
185 if (!tmp)
186 break;
187 set_page_private(tmp, (unsigned long)page);
188 page = tmp;
189 }
190
191 if (i == number)
192 return page;
193
194 /* Not enough pages immediately available this time.
195 * No need to jump around here, drbd_pp_alloc will retry this
196 * function "soon". */
197 if (page) {
198 tmp = page_chain_tail(page, NULL);
199 spin_lock(&drbd_pp_lock);
200 page_chain_add(&drbd_pp_pool, page, tmp);
201 drbd_pp_vacant += i;
202 spin_unlock(&drbd_pp_lock);
203 }
204 return NULL;
b411b363
PR
205}
206
207/* kick lower level device, if we have more than (arbitrary number)
208 * reference counts on it, which typically are locally submitted io
209 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
210static void maybe_kick_lo(struct drbd_conf *mdev)
211{
212 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
213 drbd_kick_lo(mdev);
214}
215
216static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
217{
218 struct drbd_epoch_entry *e;
219 struct list_head *le, *tle;
220
221 /* The EEs are always appended to the end of the list. Since
222 they are sent in order over the wire, they have to finish
223 in order. As soon as we see the first not finished we can
224 stop to examine the list... */
225
226 list_for_each_safe(le, tle, &mdev->net_ee) {
227 e = list_entry(le, struct drbd_epoch_entry, w.list);
45bb912b 228 if (drbd_ee_has_active_page(e))
b411b363
PR
229 break;
230 list_move(le, to_be_freed);
231 }
232}
233
234static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
235{
236 LIST_HEAD(reclaimed);
237 struct drbd_epoch_entry *e, *t;
238
239 maybe_kick_lo(mdev);
240 spin_lock_irq(&mdev->req_lock);
241 reclaim_net_ee(mdev, &reclaimed);
242 spin_unlock_irq(&mdev->req_lock);
243
244 list_for_each_entry_safe(e, t, &reclaimed, w.list)
245 drbd_free_ee(mdev, e);
246}
247
248/**
45bb912b 249 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
b411b363 250 * @mdev: DRBD device.
45bb912b
LE
251 * @number: number of pages requested
252 * @retry: whether to retry, if not enough pages are available right now
253 *
254 * Tries to allocate number pages, first from our own page pool, then from
255 * the kernel, unless this allocation would exceed the max_buffers setting.
256 * Possibly retry until DRBD frees sufficient pages somewhere else.
b411b363 257 *
45bb912b 258 * Returns a page chain linked via page->private.
b411b363 259 */
45bb912b 260static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
b411b363
PR
261{
262 struct page *page = NULL;
263 DEFINE_WAIT(wait);
264
45bb912b
LE
265 /* Yes, we may run up to @number over max_buffers. If we
266 * follow it strictly, the admin will get it wrong anyways. */
267 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
268 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363 269
45bb912b 270 while (page == NULL) {
b411b363
PR
271 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
272
273 drbd_kick_lo_and_reclaim_net(mdev);
274
275 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
45bb912b 276 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
b411b363
PR
277 if (page)
278 break;
279 }
280
281 if (!retry)
282 break;
283
284 if (signal_pending(current)) {
285 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
286 break;
287 }
288
289 schedule();
290 }
291 finish_wait(&drbd_pp_wait, &wait);
292
45bb912b
LE
293 if (page)
294 atomic_add(number, &mdev->pp_in_use);
b411b363
PR
295 return page;
296}
297
298/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
45bb912b
LE
299 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
300 * Either links the page chain back to the global pool,
301 * or returns all pages to the system. */
b411b363
PR
302static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
303{
b411b363 304 int i;
45bb912b
LE
305 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
306 i = page_chain_free(page);
307 else {
308 struct page *tmp;
309 tmp = page_chain_tail(page, &i);
310 spin_lock(&drbd_pp_lock);
311 page_chain_add(&drbd_pp_pool, page, tmp);
312 drbd_pp_vacant += i;
313 spin_unlock(&drbd_pp_lock);
b411b363 314 }
45bb912b
LE
315 atomic_sub(i, &mdev->pp_in_use);
316 i = atomic_read(&mdev->pp_in_use);
317 if (i < 0)
318 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
b411b363
PR
319 wake_up(&drbd_pp_wait);
320}
321
322/*
323You need to hold the req_lock:
324 _drbd_wait_ee_list_empty()
325
326You must not have the req_lock:
327 drbd_free_ee()
328 drbd_alloc_ee()
329 drbd_init_ee()
330 drbd_release_ee()
331 drbd_ee_fix_bhs()
332 drbd_process_done_ee()
333 drbd_clear_done_ee()
334 drbd_wait_ee_list_empty()
335*/
336
337struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
338 u64 id,
339 sector_t sector,
340 unsigned int data_size,
341 gfp_t gfp_mask) __must_hold(local)
342{
b411b363
PR
343 struct drbd_epoch_entry *e;
344 struct page *page;
45bb912b 345 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
b411b363
PR
346
347 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
348 return NULL;
349
350 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
351 if (!e) {
352 if (!(gfp_mask & __GFP_NOWARN))
353 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
354 return NULL;
355 }
356
45bb912b
LE
357 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
358 if (!page)
359 goto fail;
b411b363 360
b411b363
PR
361 INIT_HLIST_NODE(&e->colision);
362 e->epoch = NULL;
45bb912b
LE
363 e->mdev = mdev;
364 e->pages = page;
365 atomic_set(&e->pending_bios, 0);
366 e->size = data_size;
b411b363 367 e->flags = 0;
45bb912b
LE
368 e->sector = sector;
369 e->sector = sector;
370 e->block_id = id;
b411b363 371
b411b363
PR
372 return e;
373
45bb912b 374 fail:
b411b363 375 mempool_free(e, drbd_ee_mempool);
b411b363
PR
376 return NULL;
377}
378
379void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
380{
45bb912b
LE
381 drbd_pp_free(mdev, e->pages);
382 D_ASSERT(atomic_read(&e->pending_bios) == 0);
b411b363
PR
383 D_ASSERT(hlist_unhashed(&e->colision));
384 mempool_free(e, drbd_ee_mempool);
385}
386
387int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
388{
389 LIST_HEAD(work_list);
390 struct drbd_epoch_entry *e, *t;
391 int count = 0;
392
393 spin_lock_irq(&mdev->req_lock);
394 list_splice_init(list, &work_list);
395 spin_unlock_irq(&mdev->req_lock);
396
397 list_for_each_entry_safe(e, t, &work_list, w.list) {
398 drbd_free_ee(mdev, e);
399 count++;
400 }
401 return count;
402}
403
404
405/*
406 * This function is called from _asender only_
407 * but see also comments in _req_mod(,barrier_acked)
408 * and receive_Barrier.
409 *
410 * Move entries from net_ee to done_ee, if ready.
411 * Grab done_ee, call all callbacks, free the entries.
412 * The callbacks typically send out ACKs.
413 */
414static int drbd_process_done_ee(struct drbd_conf *mdev)
415{
416 LIST_HEAD(work_list);
417 LIST_HEAD(reclaimed);
418 struct drbd_epoch_entry *e, *t;
419 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
420
421 spin_lock_irq(&mdev->req_lock);
422 reclaim_net_ee(mdev, &reclaimed);
423 list_splice_init(&mdev->done_ee, &work_list);
424 spin_unlock_irq(&mdev->req_lock);
425
426 list_for_each_entry_safe(e, t, &reclaimed, w.list)
427 drbd_free_ee(mdev, e);
428
429 /* possible callbacks here:
430 * e_end_block, and e_end_resync_block, e_send_discard_ack.
431 * all ignore the last argument.
432 */
433 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
434 /* list_del not necessary, next/prev members not touched */
435 ok = e->w.cb(mdev, &e->w, !ok) && ok;
436 drbd_free_ee(mdev, e);
437 }
438 wake_up(&mdev->ee_wait);
439
440 return ok;
441}
442
443void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444{
445 DEFINE_WAIT(wait);
446
447 /* avoids spin_lock/unlock
448 * and calling prepare_to_wait in the fast path */
449 while (!list_empty(head)) {
450 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
451 spin_unlock_irq(&mdev->req_lock);
452 drbd_kick_lo(mdev);
453 schedule();
454 finish_wait(&mdev->ee_wait, &wait);
455 spin_lock_irq(&mdev->req_lock);
456 }
457}
458
459void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
460{
461 spin_lock_irq(&mdev->req_lock);
462 _drbd_wait_ee_list_empty(mdev, head);
463 spin_unlock_irq(&mdev->req_lock);
464}
465
466/* see also kernel_accept; which is only present since 2.6.18.
467 * also we want to log which part of it failed, exactly */
468static int drbd_accept(struct drbd_conf *mdev, const char **what,
469 struct socket *sock, struct socket **newsock)
470{
471 struct sock *sk = sock->sk;
472 int err = 0;
473
474 *what = "listen";
475 err = sock->ops->listen(sock, 5);
476 if (err < 0)
477 goto out;
478
479 *what = "sock_create_lite";
480 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
481 newsock);
482 if (err < 0)
483 goto out;
484
485 *what = "accept";
486 err = sock->ops->accept(sock, *newsock, 0);
487 if (err < 0) {
488 sock_release(*newsock);
489 *newsock = NULL;
490 goto out;
491 }
492 (*newsock)->ops = sock->ops;
493
494out:
495 return err;
496}
497
498static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
499 void *buf, size_t size, int flags)
500{
501 mm_segment_t oldfs;
502 struct kvec iov = {
503 .iov_base = buf,
504 .iov_len = size,
505 };
506 struct msghdr msg = {
507 .msg_iovlen = 1,
508 .msg_iov = (struct iovec *)&iov,
509 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
510 };
511 int rv;
512
513 oldfs = get_fs();
514 set_fs(KERNEL_DS);
515 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
516 set_fs(oldfs);
517
518 return rv;
519}
520
521static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
522{
523 mm_segment_t oldfs;
524 struct kvec iov = {
525 .iov_base = buf,
526 .iov_len = size,
527 };
528 struct msghdr msg = {
529 .msg_iovlen = 1,
530 .msg_iov = (struct iovec *)&iov,
531 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
532 };
533 int rv;
534
535 oldfs = get_fs();
536 set_fs(KERNEL_DS);
537
538 for (;;) {
539 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
540 if (rv == size)
541 break;
542
543 /* Note:
544 * ECONNRESET other side closed the connection
545 * ERESTARTSYS (on sock) we got a signal
546 */
547
548 if (rv < 0) {
549 if (rv == -ECONNRESET)
550 dev_info(DEV, "sock was reset by peer\n");
551 else if (rv != -ERESTARTSYS)
552 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
553 break;
554 } else if (rv == 0) {
555 dev_info(DEV, "sock was shut down by peer\n");
556 break;
557 } else {
558 /* signal came in, or peer/link went down,
559 * after we read a partial message
560 */
561 /* D_ASSERT(signal_pending(current)); */
562 break;
563 }
564 };
565
566 set_fs(oldfs);
567
568 if (rv != size)
569 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
570
571 return rv;
572}
573
574static struct socket *drbd_try_connect(struct drbd_conf *mdev)
575{
576 const char *what;
577 struct socket *sock;
578 struct sockaddr_in6 src_in6;
579 int err;
580 int disconnect_on_error = 1;
581
582 if (!get_net_conf(mdev))
583 return NULL;
584
585 what = "sock_create_kern";
586 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
587 SOCK_STREAM, IPPROTO_TCP, &sock);
588 if (err < 0) {
589 sock = NULL;
590 goto out;
591 }
592
593 sock->sk->sk_rcvtimeo =
594 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
595
596 /* explicitly bind to the configured IP as source IP
597 * for the outgoing connections.
598 * This is needed for multihomed hosts and to be
599 * able to use lo: interfaces for drbd.
600 * Make sure to use 0 as port number, so linux selects
601 * a free one dynamically.
602 */
603 memcpy(&src_in6, mdev->net_conf->my_addr,
604 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
605 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
606 src_in6.sin6_port = 0;
607 else
608 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
609
610 what = "bind before connect";
611 err = sock->ops->bind(sock,
612 (struct sockaddr *) &src_in6,
613 mdev->net_conf->my_addr_len);
614 if (err < 0)
615 goto out;
616
617 /* connect may fail, peer not yet available.
618 * stay C_WF_CONNECTION, don't go Disconnecting! */
619 disconnect_on_error = 0;
620 what = "connect";
621 err = sock->ops->connect(sock,
622 (struct sockaddr *)mdev->net_conf->peer_addr,
623 mdev->net_conf->peer_addr_len, 0);
624
625out:
626 if (err < 0) {
627 if (sock) {
628 sock_release(sock);
629 sock = NULL;
630 }
631 switch (-err) {
632 /* timeout, busy, signal pending */
633 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
634 case EINTR: case ERESTARTSYS:
635 /* peer not (yet) available, network problem */
636 case ECONNREFUSED: case ENETUNREACH:
637 case EHOSTDOWN: case EHOSTUNREACH:
638 disconnect_on_error = 0;
639 break;
640 default:
641 dev_err(DEV, "%s failed, err = %d\n", what, err);
642 }
643 if (disconnect_on_error)
644 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
645 }
646 put_net_conf(mdev);
647 return sock;
648}
649
650static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
651{
652 int timeo, err;
653 struct socket *s_estab = NULL, *s_listen;
654 const char *what;
655
656 if (!get_net_conf(mdev))
657 return NULL;
658
659 what = "sock_create_kern";
660 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
661 SOCK_STREAM, IPPROTO_TCP, &s_listen);
662 if (err) {
663 s_listen = NULL;
664 goto out;
665 }
666
667 timeo = mdev->net_conf->try_connect_int * HZ;
668 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
669
670 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
671 s_listen->sk->sk_rcvtimeo = timeo;
672 s_listen->sk->sk_sndtimeo = timeo;
673
674 what = "bind before listen";
675 err = s_listen->ops->bind(s_listen,
676 (struct sockaddr *) mdev->net_conf->my_addr,
677 mdev->net_conf->my_addr_len);
678 if (err < 0)
679 goto out;
680
681 err = drbd_accept(mdev, &what, s_listen, &s_estab);
682
683out:
684 if (s_listen)
685 sock_release(s_listen);
686 if (err < 0) {
687 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
688 dev_err(DEV, "%s failed, err = %d\n", what, err);
689 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
690 }
691 }
692 put_net_conf(mdev);
693
694 return s_estab;
695}
696
697static int drbd_send_fp(struct drbd_conf *mdev,
698 struct socket *sock, enum drbd_packets cmd)
699{
700 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
701
702 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
703}
704
705static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
706{
707 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
708 int rr;
709
710 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
711
712 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
713 return be16_to_cpu(h->command);
714
715 return 0xffff;
716}
717
718/**
719 * drbd_socket_okay() - Free the socket if its connection is not okay
720 * @mdev: DRBD device.
721 * @sock: pointer to the pointer to the socket.
722 */
723static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
724{
725 int rr;
726 char tb[4];
727
728 if (!*sock)
729 return FALSE;
730
731 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
732
733 if (rr > 0 || rr == -EAGAIN) {
734 return TRUE;
735 } else {
736 sock_release(*sock);
737 *sock = NULL;
738 return FALSE;
739 }
740}
741
742/*
743 * return values:
744 * 1 yes, we have a valid connection
745 * 0 oops, did not work out, please try again
746 * -1 peer talks different language,
747 * no point in trying again, please go standalone.
748 * -2 We do not have a network config...
749 */
750static int drbd_connect(struct drbd_conf *mdev)
751{
752 struct socket *s, *sock, *msock;
753 int try, h, ok;
754
755 D_ASSERT(!mdev->data.socket);
756
757 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
758 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
759
760 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
761 return -2;
762
763 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
764
765 sock = NULL;
766 msock = NULL;
767
768 do {
769 for (try = 0;;) {
770 /* 3 tries, this should take less than a second! */
771 s = drbd_try_connect(mdev);
772 if (s || ++try >= 3)
773 break;
774 /* give the other side time to call bind() & listen() */
775 __set_current_state(TASK_INTERRUPTIBLE);
776 schedule_timeout(HZ / 10);
777 }
778
779 if (s) {
780 if (!sock) {
781 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
782 sock = s;
783 s = NULL;
784 } else if (!msock) {
785 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
786 msock = s;
787 s = NULL;
788 } else {
789 dev_err(DEV, "Logic error in drbd_connect()\n");
790 goto out_release_sockets;
791 }
792 }
793
794 if (sock && msock) {
795 __set_current_state(TASK_INTERRUPTIBLE);
796 schedule_timeout(HZ / 10);
797 ok = drbd_socket_okay(mdev, &sock);
798 ok = drbd_socket_okay(mdev, &msock) && ok;
799 if (ok)
800 break;
801 }
802
803retry:
804 s = drbd_wait_for_connect(mdev);
805 if (s) {
806 try = drbd_recv_fp(mdev, s);
807 drbd_socket_okay(mdev, &sock);
808 drbd_socket_okay(mdev, &msock);
809 switch (try) {
810 case P_HAND_SHAKE_S:
811 if (sock) {
812 dev_warn(DEV, "initial packet S crossed\n");
813 sock_release(sock);
814 }
815 sock = s;
816 break;
817 case P_HAND_SHAKE_M:
818 if (msock) {
819 dev_warn(DEV, "initial packet M crossed\n");
820 sock_release(msock);
821 }
822 msock = s;
823 set_bit(DISCARD_CONCURRENT, &mdev->flags);
824 break;
825 default:
826 dev_warn(DEV, "Error receiving initial packet\n");
827 sock_release(s);
828 if (random32() & 1)
829 goto retry;
830 }
831 }
832
833 if (mdev->state.conn <= C_DISCONNECTING)
834 goto out_release_sockets;
835 if (signal_pending(current)) {
836 flush_signals(current);
837 smp_rmb();
838 if (get_t_state(&mdev->receiver) == Exiting)
839 goto out_release_sockets;
840 }
841
842 if (sock && msock) {
843 ok = drbd_socket_okay(mdev, &sock);
844 ok = drbd_socket_okay(mdev, &msock) && ok;
845 if (ok)
846 break;
847 }
848 } while (1);
849
850 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
851 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
852
853 sock->sk->sk_allocation = GFP_NOIO;
854 msock->sk->sk_allocation = GFP_NOIO;
855
856 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
857 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
858
859 if (mdev->net_conf->sndbuf_size) {
860 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
861 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
862 }
863
864 if (mdev->net_conf->rcvbuf_size) {
865 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
866 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
867 }
868
869 /* NOT YET ...
870 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
871 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
872 * first set it to the P_HAND_SHAKE timeout,
873 * which we set to 4x the configured ping_timeout. */
874 sock->sk->sk_sndtimeo =
875 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
876
877 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
878 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
879
880 /* we don't want delays.
881 * we use TCP_CORK where apropriate, though */
882 drbd_tcp_nodelay(sock);
883 drbd_tcp_nodelay(msock);
884
885 mdev->data.socket = sock;
886 mdev->meta.socket = msock;
887 mdev->last_received = jiffies;
888
889 D_ASSERT(mdev->asender.task == NULL);
890
891 h = drbd_do_handshake(mdev);
892 if (h <= 0)
893 return h;
894
895 if (mdev->cram_hmac_tfm) {
896 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
897 switch (drbd_do_auth(mdev)) {
898 case -1:
b411b363
PR
899 dev_err(DEV, "Authentication of peer failed\n");
900 return -1;
b10d96cb
JT
901 case 0:
902 dev_err(DEV, "Authentication of peer failed, trying again.\n");
903 return 0;
b411b363
PR
904 }
905 }
906
907 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
908 return 0;
909
910 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
911 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
912
913 atomic_set(&mdev->packet_seq, 0);
914 mdev->peer_seq = 0;
915
916 drbd_thread_start(&mdev->asender);
917
7e2455c1
PR
918 if (!drbd_send_protocol(mdev))
919 return -1;
b411b363 920 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 921 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
922 drbd_send_uuids(mdev);
923 drbd_send_state(mdev);
924 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
925 clear_bit(RESIZE_PENDING, &mdev->flags);
926
927 return 1;
928
929out_release_sockets:
930 if (sock)
931 sock_release(sock);
932 if (msock)
933 sock_release(msock);
934 return -1;
935}
936
937static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
938{
939 int r;
940
941 r = drbd_recv(mdev, h, sizeof(*h));
942
943 if (unlikely(r != sizeof(*h))) {
944 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
945 return FALSE;
946 };
947 h->command = be16_to_cpu(h->command);
948 h->length = be16_to_cpu(h->length);
949 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
950 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
951 (long)be32_to_cpu(h->magic),
952 h->command, h->length);
953 return FALSE;
954 }
955 mdev->last_received = jiffies;
956
957 return TRUE;
958}
959
960static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
961{
962 int rv;
963
964 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a
DM
965 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
966 NULL, BLKDEV_IFL_WAIT);
b411b363
PR
967 if (rv) {
968 dev_err(DEV, "local disk flush failed with status %d\n", rv);
969 /* would rather check on EOPNOTSUPP, but that is not reliable.
970 * don't try again for ANY return value != 0
971 * if (rv == -EOPNOTSUPP) */
972 drbd_bump_write_ordering(mdev, WO_drain_io);
973 }
974 put_ldev(mdev);
975 }
976
977 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
978}
979
980static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
981{
982 struct flush_work *fw = (struct flush_work *)w;
983 struct drbd_epoch *epoch = fw->epoch;
984
985 kfree(w);
986
987 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
988 drbd_flush_after_epoch(mdev, epoch);
989
990 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
991 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
992
993 return 1;
994}
995
996/**
997 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
998 * @mdev: DRBD device.
999 * @epoch: Epoch object.
1000 * @ev: Epoch event.
1001 */
1002static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1003 struct drbd_epoch *epoch,
1004 enum epoch_event ev)
1005{
1006 int finish, epoch_size;
1007 struct drbd_epoch *next_epoch;
1008 int schedule_flush = 0;
1009 enum finish_epoch rv = FE_STILL_LIVE;
1010
1011 spin_lock(&mdev->epoch_lock);
1012 do {
1013 next_epoch = NULL;
1014 finish = 0;
1015
1016 epoch_size = atomic_read(&epoch->epoch_size);
1017
1018 switch (ev & ~EV_CLEANUP) {
1019 case EV_PUT:
1020 atomic_dec(&epoch->active);
1021 break;
1022 case EV_GOT_BARRIER_NR:
1023 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1024
1025 /* Special case: If we just switched from WO_bio_barrier to
1026 WO_bdev_flush we should not finish the current epoch */
1027 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1028 mdev->write_ordering != WO_bio_barrier &&
1029 epoch == mdev->current_epoch)
1030 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1031 break;
1032 case EV_BARRIER_DONE:
1033 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1034 break;
1035 case EV_BECAME_LAST:
1036 /* nothing to do*/
1037 break;
1038 }
1039
b411b363
PR
1040 if (epoch_size != 0 &&
1041 atomic_read(&epoch->active) == 0 &&
1042 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1043 epoch->list.prev == &mdev->current_epoch->list &&
1044 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1045 /* Nearly all conditions are met to finish that epoch... */
1046 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1047 mdev->write_ordering == WO_none ||
1048 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1049 ev & EV_CLEANUP) {
1050 finish = 1;
1051 set_bit(DE_IS_FINISHING, &epoch->flags);
1052 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1053 mdev->write_ordering == WO_bio_barrier) {
1054 atomic_inc(&epoch->active);
1055 schedule_flush = 1;
1056 }
1057 }
1058 if (finish) {
1059 if (!(ev & EV_CLEANUP)) {
1060 spin_unlock(&mdev->epoch_lock);
1061 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1062 spin_lock(&mdev->epoch_lock);
1063 }
1064 dec_unacked(mdev);
1065
1066 if (mdev->current_epoch != epoch) {
1067 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1068 list_del(&epoch->list);
1069 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1070 mdev->epochs--;
b411b363
PR
1071 kfree(epoch);
1072
1073 if (rv == FE_STILL_LIVE)
1074 rv = FE_DESTROYED;
1075 } else {
1076 epoch->flags = 0;
1077 atomic_set(&epoch->epoch_size, 0);
1078 /* atomic_set(&epoch->active, 0); is alrady zero */
1079 if (rv == FE_STILL_LIVE)
1080 rv = FE_RECYCLED;
1081 }
1082 }
1083
1084 if (!next_epoch)
1085 break;
1086
1087 epoch = next_epoch;
1088 } while (1);
1089
1090 spin_unlock(&mdev->epoch_lock);
1091
1092 if (schedule_flush) {
1093 struct flush_work *fw;
1094 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1095 if (fw) {
b411b363
PR
1096 fw->w.cb = w_flush;
1097 fw->epoch = epoch;
1098 drbd_queue_work(&mdev->data.work, &fw->w);
1099 } else {
1100 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1101 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1102 /* That is not a recursion, only one level */
1103 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1104 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1105 }
1106 }
1107
1108 return rv;
1109}
1110
1111/**
1112 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1113 * @mdev: DRBD device.
1114 * @wo: Write ordering method to try.
1115 */
1116void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1117{
1118 enum write_ordering_e pwo;
1119 static char *write_ordering_str[] = {
1120 [WO_none] = "none",
1121 [WO_drain_io] = "drain",
1122 [WO_bdev_flush] = "flush",
1123 [WO_bio_barrier] = "barrier",
1124 };
1125
1126 pwo = mdev->write_ordering;
1127 wo = min(pwo, wo);
1128 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1129 wo = WO_bdev_flush;
1130 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1131 wo = WO_drain_io;
1132 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1133 wo = WO_none;
1134 mdev->write_ordering = wo;
1135 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1136 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1137}
1138
45bb912b
LE
1139/**
1140 * drbd_submit_ee()
1141 * @mdev: DRBD device.
1142 * @e: epoch entry
1143 * @rw: flag field, see bio->bi_rw
1144 */
1145/* TODO allocate from our own bio_set. */
1146int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1147 const unsigned rw, const int fault_type)
1148{
1149 struct bio *bios = NULL;
1150 struct bio *bio;
1151 struct page *page = e->pages;
1152 sector_t sector = e->sector;
1153 unsigned ds = e->size;
1154 unsigned n_bios = 0;
1155 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1156
1157 /* In most cases, we will only need one bio. But in case the lower
1158 * level restrictions happen to be different at this offset on this
1159 * side than those of the sending peer, we may need to submit the
1160 * request in more than one bio. */
1161next_bio:
1162 bio = bio_alloc(GFP_NOIO, nr_pages);
1163 if (!bio) {
1164 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1165 goto fail;
1166 }
1167 /* > e->sector, unless this is the first bio */
1168 bio->bi_sector = sector;
1169 bio->bi_bdev = mdev->ldev->backing_bdev;
1170 /* we special case some flags in the multi-bio case, see below
1171 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1172 bio->bi_rw = rw;
1173 bio->bi_private = e;
1174 bio->bi_end_io = drbd_endio_sec;
1175
1176 bio->bi_next = bios;
1177 bios = bio;
1178 ++n_bios;
1179
1180 page_chain_for_each(page) {
1181 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1182 if (!bio_add_page(bio, page, len, 0)) {
1183 /* a single page must always be possible! */
1184 BUG_ON(bio->bi_vcnt == 0);
1185 goto next_bio;
1186 }
1187 ds -= len;
1188 sector += len >> 9;
1189 --nr_pages;
1190 }
1191 D_ASSERT(page == NULL);
1192 D_ASSERT(ds == 0);
1193
1194 atomic_set(&e->pending_bios, n_bios);
1195 do {
1196 bio = bios;
1197 bios = bios->bi_next;
1198 bio->bi_next = NULL;
1199
1200 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1201 if (bios)
1202 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1203
1204 drbd_generic_make_request(mdev, fault_type, bio);
1205
1206 /* strip off BIO_RW_BARRIER,
1207 * unless it is the first or last bio */
1208 if (bios && bios->bi_next)
1209 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1210 } while (bios);
1211 maybe_kick_lo(mdev);
1212 return 0;
1213
1214fail:
1215 while (bios) {
1216 bio = bios;
1217 bios = bios->bi_next;
1218 bio_put(bio);
1219 }
1220 return -ENOMEM;
1221}
1222
b411b363
PR
1223/**
1224 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1225 * @mdev: DRBD device.
1226 * @w: work object.
1227 * @cancel: The connection will be closed anyways (unused in this callback)
1228 */
1229int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1230{
1231 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
b411b363
PR
1232 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1233 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1234 so that we can finish that epoch in drbd_may_finish_epoch().
1235 That is necessary if we already have a long chain of Epochs, before
1236 we realize that BIO_RW_BARRIER is actually not supported */
1237
1238 /* As long as the -ENOTSUPP on the barrier is reported immediately
1239 that will never trigger. If it is reported late, we will just
1240 print that warning and continue correctly for all future requests
1241 with WO_bdev_flush */
1242 if (previous_epoch(mdev, e->epoch))
1243 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1244
b411b363
PR
1245 /* we still have a local reference,
1246 * get_ldev was done in receive_Data. */
b411b363
PR
1247
1248 e->w.cb = e_end_block;
45bb912b
LE
1249 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1250 /* drbd_submit_ee fails for one reason only:
1251 * if was not able to allocate sufficient bios.
1252 * requeue, try again later. */
1253 e->w.cb = w_e_reissue;
1254 drbd_queue_work(&mdev->data.work, &e->w);
1255 }
b411b363
PR
1256 return 1;
1257}
1258
1259static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1260{
1261 int rv, issue_flush;
1262 struct p_barrier *p = (struct p_barrier *)h;
1263 struct drbd_epoch *epoch;
1264
1265 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1266
1267 rv = drbd_recv(mdev, h->payload, h->length);
1268 ERR_IF(rv != h->length) return FALSE;
1269
1270 inc_unacked(mdev);
1271
1272 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1273 drbd_kick_lo(mdev);
1274
1275 mdev->current_epoch->barrier_nr = p->barrier;
1276 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1277
1278 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1279 * the activity log, which means it would not be resynced in case the
1280 * R_PRIMARY crashes now.
1281 * Therefore we must send the barrier_ack after the barrier request was
1282 * completed. */
1283 switch (mdev->write_ordering) {
1284 case WO_bio_barrier:
1285 case WO_none:
1286 if (rv == FE_RECYCLED)
1287 return TRUE;
1288 break;
1289
1290 case WO_bdev_flush:
1291 case WO_drain_io:
367a8d73
PR
1292 if (rv == FE_STILL_LIVE) {
1293 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1294 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1295 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1296 }
b411b363
PR
1297 if (rv == FE_RECYCLED)
1298 return TRUE;
1299
1300 /* The asender will send all the ACKs and barrier ACKs out, since
1301 all EEs moved from the active_ee to the done_ee. We need to
1302 provide a new epoch object for the EEs that come in soon */
1303 break;
1304 }
1305
1306 /* receiver context, in the writeout path of the other node.
1307 * avoid potential distributed deadlock */
1308 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1309 if (!epoch) {
1310 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
d3db7b48 1311 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
b411b363
PR
1312 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1313 if (issue_flush) {
1314 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1315 if (rv == FE_RECYCLED)
1316 return TRUE;
1317 }
1318
1319 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1320
1321 return TRUE;
1322 }
1323
1324 epoch->flags = 0;
1325 atomic_set(&epoch->epoch_size, 0);
1326 atomic_set(&epoch->active, 0);
1327
1328 spin_lock(&mdev->epoch_lock);
1329 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1330 list_add(&epoch->list, &mdev->current_epoch->list);
1331 mdev->current_epoch = epoch;
1332 mdev->epochs++;
b411b363
PR
1333 } else {
1334 /* The current_epoch got recycled while we allocated this one... */
1335 kfree(epoch);
1336 }
1337 spin_unlock(&mdev->epoch_lock);
1338
1339 return TRUE;
1340}
1341
1342/* used from receive_RSDataReply (recv_resync_read)
1343 * and from receive_Data */
1344static struct drbd_epoch_entry *
1345read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1346{
6666032a 1347 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363 1348 struct drbd_epoch_entry *e;
b411b363 1349 struct page *page;
45bb912b 1350 int dgs, ds, rr;
b411b363
PR
1351 void *dig_in = mdev->int_dig_in;
1352 void *dig_vv = mdev->int_dig_vv;
6b4388ac 1353 unsigned long *data;
b411b363
PR
1354
1355 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1356 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1357
1358 if (dgs) {
1359 rr = drbd_recv(mdev, dig_in, dgs);
1360 if (rr != dgs) {
1361 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1362 rr, dgs);
1363 return NULL;
1364 }
1365 }
1366
1367 data_size -= dgs;
1368
1369 ERR_IF(data_size & 0x1ff) return NULL;
1370 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1371
6666032a
LE
1372 /* even though we trust out peer,
1373 * we sometimes have to double check. */
1374 if (sector + (data_size>>9) > capacity) {
1375 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1376 (unsigned long long)capacity,
1377 (unsigned long long)sector, data_size);
1378 return NULL;
1379 }
1380
b411b363
PR
1381 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1382 * "criss-cross" setup, that might cause write-out on some other DRBD,
1383 * which in turn might block on the other node at this very place. */
1384 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1385 if (!e)
1386 return NULL;
45bb912b 1387
b411b363 1388 ds = data_size;
45bb912b
LE
1389 page = e->pages;
1390 page_chain_for_each(page) {
1391 unsigned len = min_t(int, ds, PAGE_SIZE);
6b4388ac 1392 data = kmap(page);
45bb912b 1393 rr = drbd_recv(mdev, data, len);
6b4388ac
PR
1394 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1395 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1396 data[0] = data[0] ^ (unsigned long)-1;
1397 }
b411b363 1398 kunmap(page);
45bb912b 1399 if (rr != len) {
b411b363
PR
1400 drbd_free_ee(mdev, e);
1401 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
45bb912b 1402 rr, len);
b411b363
PR
1403 return NULL;
1404 }
1405 ds -= rr;
1406 }
1407
1408 if (dgs) {
45bb912b 1409 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
b411b363
PR
1410 if (memcmp(dig_in, dig_vv, dgs)) {
1411 dev_err(DEV, "Digest integrity check FAILED.\n");
1412 drbd_bcast_ee(mdev, "digest failed",
1413 dgs, dig_in, dig_vv, e);
1414 drbd_free_ee(mdev, e);
1415 return NULL;
1416 }
1417 }
1418 mdev->recv_cnt += data_size>>9;
1419 return e;
1420}
1421
1422/* drbd_drain_block() just takes a data block
1423 * out of the socket input buffer, and discards it.
1424 */
1425static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1426{
1427 struct page *page;
1428 int rr, rv = 1;
1429 void *data;
1430
c3470cde
LE
1431 if (!data_size)
1432 return TRUE;
1433
45bb912b 1434 page = drbd_pp_alloc(mdev, 1, 1);
b411b363
PR
1435
1436 data = kmap(page);
1437 while (data_size) {
1438 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1439 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1440 rv = 0;
1441 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1442 rr, min_t(int, data_size, PAGE_SIZE));
1443 break;
1444 }
1445 data_size -= rr;
1446 }
1447 kunmap(page);
1448 drbd_pp_free(mdev, page);
1449 return rv;
1450}
1451
1452static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1453 sector_t sector, int data_size)
1454{
1455 struct bio_vec *bvec;
1456 struct bio *bio;
1457 int dgs, rr, i, expect;
1458 void *dig_in = mdev->int_dig_in;
1459 void *dig_vv = mdev->int_dig_vv;
1460
1461 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1462 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1463
1464 if (dgs) {
1465 rr = drbd_recv(mdev, dig_in, dgs);
1466 if (rr != dgs) {
1467 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1468 rr, dgs);
1469 return 0;
1470 }
1471 }
1472
1473 data_size -= dgs;
1474
1475 /* optimistically update recv_cnt. if receiving fails below,
1476 * we disconnect anyways, and counters will be reset. */
1477 mdev->recv_cnt += data_size>>9;
1478
1479 bio = req->master_bio;
1480 D_ASSERT(sector == bio->bi_sector);
1481
1482 bio_for_each_segment(bvec, bio, i) {
1483 expect = min_t(int, data_size, bvec->bv_len);
1484 rr = drbd_recv(mdev,
1485 kmap(bvec->bv_page)+bvec->bv_offset,
1486 expect);
1487 kunmap(bvec->bv_page);
1488 if (rr != expect) {
1489 dev_warn(DEV, "short read receiving data reply: "
1490 "read %d expected %d\n",
1491 rr, expect);
1492 return 0;
1493 }
1494 data_size -= rr;
1495 }
1496
1497 if (dgs) {
45bb912b 1498 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
b411b363
PR
1499 if (memcmp(dig_in, dig_vv, dgs)) {
1500 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1501 return 0;
1502 }
1503 }
1504
1505 D_ASSERT(data_size == 0);
1506 return 1;
1507}
1508
1509/* e_end_resync_block() is called via
1510 * drbd_process_done_ee() by asender only */
1511static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1512{
1513 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1514 sector_t sector = e->sector;
1515 int ok;
1516
1517 D_ASSERT(hlist_unhashed(&e->colision));
1518
45bb912b 1519 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1520 drbd_set_in_sync(mdev, sector, e->size);
1521 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1522 } else {
1523 /* Record failure to sync */
1524 drbd_rs_failed_io(mdev, sector, e->size);
1525
1526 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1527 }
1528 dec_unacked(mdev);
1529
1530 return ok;
1531}
1532
1533static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1534{
1535 struct drbd_epoch_entry *e;
1536
1537 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
45bb912b
LE
1538 if (!e)
1539 goto fail;
b411b363
PR
1540
1541 dec_rs_pending(mdev);
1542
b411b363
PR
1543 inc_unacked(mdev);
1544 /* corresponding dec_unacked() in e_end_resync_block()
1545 * respective _drbd_clear_done_ee */
1546
45bb912b
LE
1547 e->w.cb = e_end_resync_block;
1548
b411b363
PR
1549 spin_lock_irq(&mdev->req_lock);
1550 list_add(&e->w.list, &mdev->sync_ee);
1551 spin_unlock_irq(&mdev->req_lock);
1552
45bb912b
LE
1553 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1554 return TRUE;
b411b363 1555
45bb912b
LE
1556 drbd_free_ee(mdev, e);
1557fail:
1558 put_ldev(mdev);
1559 return FALSE;
b411b363
PR
1560}
1561
1562static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1563{
1564 struct drbd_request *req;
1565 sector_t sector;
1566 unsigned int header_size, data_size;
1567 int ok;
1568 struct p_data *p = (struct p_data *)h;
1569
1570 header_size = sizeof(*p) - sizeof(*h);
1571 data_size = h->length - header_size;
1572
1573 ERR_IF(data_size == 0) return FALSE;
1574
1575 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1576 return FALSE;
1577
1578 sector = be64_to_cpu(p->sector);
1579
1580 spin_lock_irq(&mdev->req_lock);
1581 req = _ar_id_to_req(mdev, p->block_id, sector);
1582 spin_unlock_irq(&mdev->req_lock);
1583 if (unlikely(!req)) {
1584 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1585 return FALSE;
1586 }
1587
1588 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1589 * special casing it there for the various failure cases.
1590 * still no race with drbd_fail_pending_reads */
1591 ok = recv_dless_read(mdev, req, sector, data_size);
1592
1593 if (ok)
1594 req_mod(req, data_received);
1595 /* else: nothing. handled from drbd_disconnect...
1596 * I don't think we may complete this just yet
1597 * in case we are "on-disconnect: freeze" */
1598
1599 return ok;
1600}
1601
1602static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1603{
1604 sector_t sector;
1605 unsigned int header_size, data_size;
1606 int ok;
1607 struct p_data *p = (struct p_data *)h;
1608
1609 header_size = sizeof(*p) - sizeof(*h);
1610 data_size = h->length - header_size;
1611
1612 ERR_IF(data_size == 0) return FALSE;
1613
1614 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1615 return FALSE;
1616
1617 sector = be64_to_cpu(p->sector);
1618 D_ASSERT(p->block_id == ID_SYNCER);
1619
1620 if (get_ldev(mdev)) {
1621 /* data is submitted to disk within recv_resync_read.
1622 * corresponding put_ldev done below on error,
1623 * or in drbd_endio_write_sec. */
1624 ok = recv_resync_read(mdev, sector, data_size);
1625 } else {
1626 if (__ratelimit(&drbd_ratelimit_state))
1627 dev_err(DEV, "Can not write resync data to local disk.\n");
1628
1629 ok = drbd_drain_block(mdev, data_size);
1630
1631 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1632 }
1633
1634 return ok;
1635}
1636
1637/* e_end_block() is called via drbd_process_done_ee().
1638 * this means this function only runs in the asender thread
1639 */
1640static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1641{
1642 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1643 sector_t sector = e->sector;
1644 struct drbd_epoch *epoch;
1645 int ok = 1, pcmd;
1646
1647 if (e->flags & EE_IS_BARRIER) {
1648 epoch = previous_epoch(mdev, e->epoch);
1649 if (epoch)
1650 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1651 }
1652
1653 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
45bb912b 1654 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
b411b363
PR
1655 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1656 mdev->state.conn <= C_PAUSED_SYNC_T &&
1657 e->flags & EE_MAY_SET_IN_SYNC) ?
1658 P_RS_WRITE_ACK : P_WRITE_ACK;
1659 ok &= drbd_send_ack(mdev, pcmd, e);
1660 if (pcmd == P_RS_WRITE_ACK)
1661 drbd_set_in_sync(mdev, sector, e->size);
1662 } else {
1663 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1664 /* we expect it to be marked out of sync anyways...
1665 * maybe assert this? */
1666 }
1667 dec_unacked(mdev);
1668 }
1669 /* we delete from the conflict detection hash _after_ we sent out the
1670 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1671 if (mdev->net_conf->two_primaries) {
1672 spin_lock_irq(&mdev->req_lock);
1673 D_ASSERT(!hlist_unhashed(&e->colision));
1674 hlist_del_init(&e->colision);
1675 spin_unlock_irq(&mdev->req_lock);
1676 } else {
1677 D_ASSERT(hlist_unhashed(&e->colision));
1678 }
1679
1680 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1681
1682 return ok;
1683}
1684
1685static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1686{
1687 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1688 int ok = 1;
1689
1690 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1691 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1692
1693 spin_lock_irq(&mdev->req_lock);
1694 D_ASSERT(!hlist_unhashed(&e->colision));
1695 hlist_del_init(&e->colision);
1696 spin_unlock_irq(&mdev->req_lock);
1697
1698 dec_unacked(mdev);
1699
1700 return ok;
1701}
1702
1703/* Called from receive_Data.
1704 * Synchronize packets on sock with packets on msock.
1705 *
1706 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1707 * packet traveling on msock, they are still processed in the order they have
1708 * been sent.
1709 *
1710 * Note: we don't care for Ack packets overtaking P_DATA packets.
1711 *
1712 * In case packet_seq is larger than mdev->peer_seq number, there are
1713 * outstanding packets on the msock. We wait for them to arrive.
1714 * In case we are the logically next packet, we update mdev->peer_seq
1715 * ourselves. Correctly handles 32bit wrap around.
1716 *
1717 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1718 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1719 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1720 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1721 *
1722 * returns 0 if we may process the packet,
1723 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1724static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1725{
1726 DEFINE_WAIT(wait);
1727 unsigned int p_seq;
1728 long timeout;
1729 int ret = 0;
1730 spin_lock(&mdev->peer_seq_lock);
1731 for (;;) {
1732 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1733 if (seq_le(packet_seq, mdev->peer_seq+1))
1734 break;
1735 if (signal_pending(current)) {
1736 ret = -ERESTARTSYS;
1737 break;
1738 }
1739 p_seq = mdev->peer_seq;
1740 spin_unlock(&mdev->peer_seq_lock);
1741 timeout = schedule_timeout(30*HZ);
1742 spin_lock(&mdev->peer_seq_lock);
1743 if (timeout == 0 && p_seq == mdev->peer_seq) {
1744 ret = -ETIMEDOUT;
1745 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1746 break;
1747 }
1748 }
1749 finish_wait(&mdev->seq_wait, &wait);
1750 if (mdev->peer_seq+1 == packet_seq)
1751 mdev->peer_seq++;
1752 spin_unlock(&mdev->peer_seq_lock);
1753 return ret;
1754}
1755
1756/* mirrored write */
1757static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1758{
1759 sector_t sector;
1760 struct drbd_epoch_entry *e;
1761 struct p_data *p = (struct p_data *)h;
1762 int header_size, data_size;
1763 int rw = WRITE;
1764 u32 dp_flags;
1765
1766 header_size = sizeof(*p) - sizeof(*h);
1767 data_size = h->length - header_size;
1768
1769 ERR_IF(data_size == 0) return FALSE;
1770
1771 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1772 return FALSE;
1773
1774 if (!get_ldev(mdev)) {
1775 if (__ratelimit(&drbd_ratelimit_state))
1776 dev_err(DEV, "Can not write mirrored data block "
1777 "to local disk.\n");
1778 spin_lock(&mdev->peer_seq_lock);
1779 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1780 mdev->peer_seq++;
1781 spin_unlock(&mdev->peer_seq_lock);
1782
1783 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1784 atomic_inc(&mdev->current_epoch->epoch_size);
1785 return drbd_drain_block(mdev, data_size);
1786 }
1787
1788 /* get_ldev(mdev) successful.
1789 * Corresponding put_ldev done either below (on various errors),
1790 * or in drbd_endio_write_sec, if we successfully submit the data at
1791 * the end of this function. */
1792
1793 sector = be64_to_cpu(p->sector);
1794 e = read_in_block(mdev, p->block_id, sector, data_size);
1795 if (!e) {
1796 put_ldev(mdev);
1797 return FALSE;
1798 }
1799
b411b363
PR
1800 e->w.cb = e_end_block;
1801
1802 spin_lock(&mdev->epoch_lock);
1803 e->epoch = mdev->current_epoch;
1804 atomic_inc(&e->epoch->epoch_size);
1805 atomic_inc(&e->epoch->active);
1806
1807 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1808 struct drbd_epoch *epoch;
1809 /* Issue a barrier if we start a new epoch, and the previous epoch
1810 was not a epoch containing a single request which already was
1811 a Barrier. */
1812 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1813 if (epoch == e->epoch) {
1814 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
b411b363
PR
1815 rw |= (1<<BIO_RW_BARRIER);
1816 e->flags |= EE_IS_BARRIER;
1817 } else {
1818 if (atomic_read(&epoch->epoch_size) > 1 ||
1819 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1820 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
b411b363 1821 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
b411b363
PR
1822 rw |= (1<<BIO_RW_BARRIER);
1823 e->flags |= EE_IS_BARRIER;
1824 }
1825 }
1826 }
1827 spin_unlock(&mdev->epoch_lock);
1828
1829 dp_flags = be32_to_cpu(p->dp_flags);
1830 if (dp_flags & DP_HARDBARRIER) {
1831 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1832 /* rw |= (1<<BIO_RW_BARRIER); */
1833 }
1834 if (dp_flags & DP_RW_SYNC)
1835 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1836 if (dp_flags & DP_MAY_SET_IN_SYNC)
1837 e->flags |= EE_MAY_SET_IN_SYNC;
1838
1839 /* I'm the receiver, I do hold a net_cnt reference. */
1840 if (!mdev->net_conf->two_primaries) {
1841 spin_lock_irq(&mdev->req_lock);
1842 } else {
1843 /* don't get the req_lock yet,
1844 * we may sleep in drbd_wait_peer_seq */
1845 const int size = e->size;
1846 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1847 DEFINE_WAIT(wait);
1848 struct drbd_request *i;
1849 struct hlist_node *n;
1850 struct hlist_head *slot;
1851 int first;
1852
1853 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1854 BUG_ON(mdev->ee_hash == NULL);
1855 BUG_ON(mdev->tl_hash == NULL);
1856
1857 /* conflict detection and handling:
1858 * 1. wait on the sequence number,
1859 * in case this data packet overtook ACK packets.
1860 * 2. check our hash tables for conflicting requests.
1861 * we only need to walk the tl_hash, since an ee can not
1862 * have a conflict with an other ee: on the submitting
1863 * node, the corresponding req had already been conflicting,
1864 * and a conflicting req is never sent.
1865 *
1866 * Note: for two_primaries, we are protocol C,
1867 * so there cannot be any request that is DONE
1868 * but still on the transfer log.
1869 *
1870 * unconditionally add to the ee_hash.
1871 *
1872 * if no conflicting request is found:
1873 * submit.
1874 *
1875 * if any conflicting request is found
1876 * that has not yet been acked,
1877 * AND I have the "discard concurrent writes" flag:
1878 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1879 *
1880 * if any conflicting request is found:
1881 * block the receiver, waiting on misc_wait
1882 * until no more conflicting requests are there,
1883 * or we get interrupted (disconnect).
1884 *
1885 * we do not just write after local io completion of those
1886 * requests, but only after req is done completely, i.e.
1887 * we wait for the P_DISCARD_ACK to arrive!
1888 *
1889 * then proceed normally, i.e. submit.
1890 */
1891 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1892 goto out_interrupted;
1893
1894 spin_lock_irq(&mdev->req_lock);
1895
1896 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1897
1898#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1899 slot = tl_hash_slot(mdev, sector);
1900 first = 1;
1901 for (;;) {
1902 int have_unacked = 0;
1903 int have_conflict = 0;
1904 prepare_to_wait(&mdev->misc_wait, &wait,
1905 TASK_INTERRUPTIBLE);
1906 hlist_for_each_entry(i, n, slot, colision) {
1907 if (OVERLAPS) {
1908 /* only ALERT on first iteration,
1909 * we may be woken up early... */
1910 if (first)
1911 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1912 " new: %llus +%u; pending: %llus +%u\n",
1913 current->comm, current->pid,
1914 (unsigned long long)sector, size,
1915 (unsigned long long)i->sector, i->size);
1916 if (i->rq_state & RQ_NET_PENDING)
1917 ++have_unacked;
1918 ++have_conflict;
1919 }
1920 }
1921#undef OVERLAPS
1922 if (!have_conflict)
1923 break;
1924
1925 /* Discard Ack only for the _first_ iteration */
1926 if (first && discard && have_unacked) {
1927 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1928 (unsigned long long)sector);
1929 inc_unacked(mdev);
1930 e->w.cb = e_send_discard_ack;
1931 list_add_tail(&e->w.list, &mdev->done_ee);
1932
1933 spin_unlock_irq(&mdev->req_lock);
1934
1935 /* we could probably send that P_DISCARD_ACK ourselves,
1936 * but I don't like the receiver using the msock */
1937
1938 put_ldev(mdev);
1939 wake_asender(mdev);
1940 finish_wait(&mdev->misc_wait, &wait);
1941 return TRUE;
1942 }
1943
1944 if (signal_pending(current)) {
1945 hlist_del_init(&e->colision);
1946
1947 spin_unlock_irq(&mdev->req_lock);
1948
1949 finish_wait(&mdev->misc_wait, &wait);
1950 goto out_interrupted;
1951 }
1952
1953 spin_unlock_irq(&mdev->req_lock);
1954 if (first) {
1955 first = 0;
1956 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1957 "sec=%llus\n", (unsigned long long)sector);
1958 } else if (discard) {
1959 /* we had none on the first iteration.
1960 * there must be none now. */
1961 D_ASSERT(have_unacked == 0);
1962 }
1963 schedule();
1964 spin_lock_irq(&mdev->req_lock);
1965 }
1966 finish_wait(&mdev->misc_wait, &wait);
1967 }
1968
1969 list_add(&e->w.list, &mdev->active_ee);
1970 spin_unlock_irq(&mdev->req_lock);
1971
1972 switch (mdev->net_conf->wire_protocol) {
1973 case DRBD_PROT_C:
1974 inc_unacked(mdev);
1975 /* corresponding dec_unacked() in e_end_block()
1976 * respective _drbd_clear_done_ee */
1977 break;
1978 case DRBD_PROT_B:
1979 /* I really don't like it that the receiver thread
1980 * sends on the msock, but anyways */
1981 drbd_send_ack(mdev, P_RECV_ACK, e);
1982 break;
1983 case DRBD_PROT_A:
1984 /* nothing to do */
1985 break;
1986 }
1987
1988 if (mdev->state.pdsk == D_DISKLESS) {
1989 /* In case we have the only disk of the cluster, */
1990 drbd_set_out_of_sync(mdev, e->sector, e->size);
1991 e->flags |= EE_CALL_AL_COMPLETE_IO;
1992 drbd_al_begin_io(mdev, e->sector);
1993 }
1994
45bb912b
LE
1995 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1996 return TRUE;
b411b363
PR
1997
1998out_interrupted:
1999 /* yes, the epoch_size now is imbalanced.
2000 * but we drop the connection anyways, so we don't have a chance to
2001 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2002 put_ldev(mdev);
2003 drbd_free_ee(mdev, e);
2004 return FALSE;
2005}
2006
2007static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2008{
2009 sector_t sector;
2010 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2011 struct drbd_epoch_entry *e;
2012 struct digest_info *di = NULL;
2013 int size, digest_size;
2014 unsigned int fault_type;
2015 struct p_block_req *p =
2016 (struct p_block_req *)h;
2017 const int brps = sizeof(*p)-sizeof(*h);
2018
2019 if (drbd_recv(mdev, h->payload, brps) != brps)
2020 return FALSE;
2021
2022 sector = be64_to_cpu(p->sector);
2023 size = be32_to_cpu(p->blksize);
2024
2025 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2026 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2027 (unsigned long long)sector, size);
2028 return FALSE;
2029 }
2030 if (sector + (size>>9) > capacity) {
2031 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2032 (unsigned long long)sector, size);
2033 return FALSE;
2034 }
2035
2036 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2037 if (__ratelimit(&drbd_ratelimit_state))
2038 dev_err(DEV, "Can not satisfy peer's read request, "
2039 "no local data.\n");
2040 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2041 P_NEG_RS_DREPLY , p);
c3470cde 2042 return drbd_drain_block(mdev, h->length - brps);
b411b363
PR
2043 }
2044
2045 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2046 * "criss-cross" setup, that might cause write-out on some other DRBD,
2047 * which in turn might block on the other node at this very place. */
2048 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2049 if (!e) {
2050 put_ldev(mdev);
2051 return FALSE;
2052 }
2053
b411b363
PR
2054 switch (h->command) {
2055 case P_DATA_REQUEST:
2056 e->w.cb = w_e_end_data_req;
2057 fault_type = DRBD_FAULT_DT_RD;
2058 break;
2059 case P_RS_DATA_REQUEST:
2060 e->w.cb = w_e_end_rsdata_req;
2061 fault_type = DRBD_FAULT_RS_RD;
2062 /* Eventually this should become asynchronously. Currently it
2063 * blocks the whole receiver just to delay the reading of a
2064 * resync data block.
2065 * the drbd_work_queue mechanism is made for this...
2066 */
2067 if (!drbd_rs_begin_io(mdev, sector)) {
2068 /* we have been interrupted,
2069 * probably connection lost! */
2070 D_ASSERT(signal_pending(current));
2071 goto out_free_e;
2072 }
2073 break;
2074
2075 case P_OV_REPLY:
2076 case P_CSUM_RS_REQUEST:
2077 fault_type = DRBD_FAULT_RS_RD;
2078 digest_size = h->length - brps ;
2079 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2080 if (!di)
2081 goto out_free_e;
2082
2083 di->digest_size = digest_size;
2084 di->digest = (((char *)di)+sizeof(struct digest_info));
2085
2086 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2087 goto out_free_e;
2088
2089 e->block_id = (u64)(unsigned long)di;
2090 if (h->command == P_CSUM_RS_REQUEST) {
2091 D_ASSERT(mdev->agreed_pro_version >= 89);
2092 e->w.cb = w_e_end_csum_rs_req;
2093 } else if (h->command == P_OV_REPLY) {
2094 e->w.cb = w_e_end_ov_reply;
2095 dec_rs_pending(mdev);
2096 break;
2097 }
2098
2099 if (!drbd_rs_begin_io(mdev, sector)) {
2100 /* we have been interrupted, probably connection lost! */
2101 D_ASSERT(signal_pending(current));
2102 goto out_free_e;
2103 }
2104 break;
2105
2106 case P_OV_REQUEST:
2107 if (mdev->state.conn >= C_CONNECTED &&
2108 mdev->state.conn != C_VERIFY_T)
2109 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2110 drbd_conn_str(mdev->state.conn));
2111 if (mdev->ov_start_sector == ~(sector_t)0 &&
2112 mdev->agreed_pro_version >= 90) {
2113 mdev->ov_start_sector = sector;
2114 mdev->ov_position = sector;
2115 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2116 dev_info(DEV, "Online Verify start sector: %llu\n",
2117 (unsigned long long)sector);
2118 }
2119 e->w.cb = w_e_end_ov_req;
2120 fault_type = DRBD_FAULT_RS_RD;
2121 /* Eventually this should become asynchronous. Currently it
2122 * blocks the whole receiver just to delay the reading of a
2123 * resync data block.
2124 * the drbd_work_queue mechanism is made for this...
2125 */
2126 if (!drbd_rs_begin_io(mdev, sector)) {
2127 /* we have been interrupted,
2128 * probably connection lost! */
2129 D_ASSERT(signal_pending(current));
2130 goto out_free_e;
2131 }
2132 break;
2133
2134
2135 default:
2136 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2137 cmdname(h->command));
2138 fault_type = DRBD_FAULT_MAX;
2139 }
2140
2141 spin_lock_irq(&mdev->req_lock);
2142 list_add(&e->w.list, &mdev->read_ee);
2143 spin_unlock_irq(&mdev->req_lock);
2144
2145 inc_unacked(mdev);
2146
45bb912b
LE
2147 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2148 return TRUE;
b411b363
PR
2149
2150out_free_e:
2151 kfree(di);
2152 put_ldev(mdev);
2153 drbd_free_ee(mdev, e);
2154 return FALSE;
2155}
2156
2157static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2158{
2159 int self, peer, rv = -100;
2160 unsigned long ch_self, ch_peer;
2161
2162 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2163 peer = mdev->p_uuid[UI_BITMAP] & 1;
2164
2165 ch_peer = mdev->p_uuid[UI_SIZE];
2166 ch_self = mdev->comm_bm_set;
2167
2168 switch (mdev->net_conf->after_sb_0p) {
2169 case ASB_CONSENSUS:
2170 case ASB_DISCARD_SECONDARY:
2171 case ASB_CALL_HELPER:
2172 dev_err(DEV, "Configuration error.\n");
2173 break;
2174 case ASB_DISCONNECT:
2175 break;
2176 case ASB_DISCARD_YOUNGER_PRI:
2177 if (self == 0 && peer == 1) {
2178 rv = -1;
2179 break;
2180 }
2181 if (self == 1 && peer == 0) {
2182 rv = 1;
2183 break;
2184 }
2185 /* Else fall through to one of the other strategies... */
2186 case ASB_DISCARD_OLDER_PRI:
2187 if (self == 0 && peer == 1) {
2188 rv = 1;
2189 break;
2190 }
2191 if (self == 1 && peer == 0) {
2192 rv = -1;
2193 break;
2194 }
2195 /* Else fall through to one of the other strategies... */
ad19bf6e 2196 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2197 "Using discard-least-changes instead\n");
2198 case ASB_DISCARD_ZERO_CHG:
2199 if (ch_peer == 0 && ch_self == 0) {
2200 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2201 ? -1 : 1;
2202 break;
2203 } else {
2204 if (ch_peer == 0) { rv = 1; break; }
2205 if (ch_self == 0) { rv = -1; break; }
2206 }
2207 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2208 break;
2209 case ASB_DISCARD_LEAST_CHG:
2210 if (ch_self < ch_peer)
2211 rv = -1;
2212 else if (ch_self > ch_peer)
2213 rv = 1;
2214 else /* ( ch_self == ch_peer ) */
2215 /* Well, then use something else. */
2216 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2217 ? -1 : 1;
2218 break;
2219 case ASB_DISCARD_LOCAL:
2220 rv = -1;
2221 break;
2222 case ASB_DISCARD_REMOTE:
2223 rv = 1;
2224 }
2225
2226 return rv;
2227}
2228
2229static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2230{
2231 int self, peer, hg, rv = -100;
2232
2233 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2234 peer = mdev->p_uuid[UI_BITMAP] & 1;
2235
2236 switch (mdev->net_conf->after_sb_1p) {
2237 case ASB_DISCARD_YOUNGER_PRI:
2238 case ASB_DISCARD_OLDER_PRI:
2239 case ASB_DISCARD_LEAST_CHG:
2240 case ASB_DISCARD_LOCAL:
2241 case ASB_DISCARD_REMOTE:
2242 dev_err(DEV, "Configuration error.\n");
2243 break;
2244 case ASB_DISCONNECT:
2245 break;
2246 case ASB_CONSENSUS:
2247 hg = drbd_asb_recover_0p(mdev);
2248 if (hg == -1 && mdev->state.role == R_SECONDARY)
2249 rv = hg;
2250 if (hg == 1 && mdev->state.role == R_PRIMARY)
2251 rv = hg;
2252 break;
2253 case ASB_VIOLENTLY:
2254 rv = drbd_asb_recover_0p(mdev);
2255 break;
2256 case ASB_DISCARD_SECONDARY:
2257 return mdev->state.role == R_PRIMARY ? 1 : -1;
2258 case ASB_CALL_HELPER:
2259 hg = drbd_asb_recover_0p(mdev);
2260 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2261 self = drbd_set_role(mdev, R_SECONDARY, 0);
2262 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2263 * we might be here in C_WF_REPORT_PARAMS which is transient.
2264 * we do not need to wait for the after state change work either. */
2265 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2266 if (self != SS_SUCCESS) {
2267 drbd_khelper(mdev, "pri-lost-after-sb");
2268 } else {
2269 dev_warn(DEV, "Successfully gave up primary role.\n");
2270 rv = hg;
2271 }
2272 } else
2273 rv = hg;
2274 }
2275
2276 return rv;
2277}
2278
2279static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2280{
2281 int self, peer, hg, rv = -100;
2282
2283 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2284 peer = mdev->p_uuid[UI_BITMAP] & 1;
2285
2286 switch (mdev->net_conf->after_sb_2p) {
2287 case ASB_DISCARD_YOUNGER_PRI:
2288 case ASB_DISCARD_OLDER_PRI:
2289 case ASB_DISCARD_LEAST_CHG:
2290 case ASB_DISCARD_LOCAL:
2291 case ASB_DISCARD_REMOTE:
2292 case ASB_CONSENSUS:
2293 case ASB_DISCARD_SECONDARY:
2294 dev_err(DEV, "Configuration error.\n");
2295 break;
2296 case ASB_VIOLENTLY:
2297 rv = drbd_asb_recover_0p(mdev);
2298 break;
2299 case ASB_DISCONNECT:
2300 break;
2301 case ASB_CALL_HELPER:
2302 hg = drbd_asb_recover_0p(mdev);
2303 if (hg == -1) {
2304 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2305 * we might be here in C_WF_REPORT_PARAMS which is transient.
2306 * we do not need to wait for the after state change work either. */
2307 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2308 if (self != SS_SUCCESS) {
2309 drbd_khelper(mdev, "pri-lost-after-sb");
2310 } else {
2311 dev_warn(DEV, "Successfully gave up primary role.\n");
2312 rv = hg;
2313 }
2314 } else
2315 rv = hg;
2316 }
2317
2318 return rv;
2319}
2320
2321static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2322 u64 bits, u64 flags)
2323{
2324 if (!uuid) {
2325 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2326 return;
2327 }
2328 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2329 text,
2330 (unsigned long long)uuid[UI_CURRENT],
2331 (unsigned long long)uuid[UI_BITMAP],
2332 (unsigned long long)uuid[UI_HISTORY_START],
2333 (unsigned long long)uuid[UI_HISTORY_END],
2334 (unsigned long long)bits,
2335 (unsigned long long)flags);
2336}
2337
2338/*
2339 100 after split brain try auto recover
2340 2 C_SYNC_SOURCE set BitMap
2341 1 C_SYNC_SOURCE use BitMap
2342 0 no Sync
2343 -1 C_SYNC_TARGET use BitMap
2344 -2 C_SYNC_TARGET set BitMap
2345 -100 after split brain, disconnect
2346-1000 unrelated data
2347 */
2348static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2349{
2350 u64 self, peer;
2351 int i, j;
2352
2353 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2354 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2355
2356 *rule_nr = 10;
2357 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2358 return 0;
2359
2360 *rule_nr = 20;
2361 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2362 peer != UUID_JUST_CREATED)
2363 return -2;
2364
2365 *rule_nr = 30;
2366 if (self != UUID_JUST_CREATED &&
2367 (peer == UUID_JUST_CREATED || peer == (u64)0))
2368 return 2;
2369
2370 if (self == peer) {
2371 int rct, dc; /* roles at crash time */
2372
2373 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2374
2375 if (mdev->agreed_pro_version < 91)
2376 return -1001;
2377
2378 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2379 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2380 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2381 drbd_uuid_set_bm(mdev, 0UL);
2382
2383 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2384 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2385 *rule_nr = 34;
2386 } else {
2387 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2388 *rule_nr = 36;
2389 }
2390
2391 return 1;
2392 }
2393
2394 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2395
2396 if (mdev->agreed_pro_version < 91)
2397 return -1001;
2398
2399 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2400 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2401 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2402
2403 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2404 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2405 mdev->p_uuid[UI_BITMAP] = 0UL;
2406
2407 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2408 *rule_nr = 35;
2409 } else {
2410 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2411 *rule_nr = 37;
2412 }
2413
2414 return -1;
2415 }
2416
2417 /* Common power [off|failure] */
2418 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2419 (mdev->p_uuid[UI_FLAGS] & 2);
2420 /* lowest bit is set when we were primary,
2421 * next bit (weight 2) is set when peer was primary */
2422 *rule_nr = 40;
2423
2424 switch (rct) {
2425 case 0: /* !self_pri && !peer_pri */ return 0;
2426 case 1: /* self_pri && !peer_pri */ return 1;
2427 case 2: /* !self_pri && peer_pri */ return -1;
2428 case 3: /* self_pri && peer_pri */
2429 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2430 return dc ? -1 : 1;
2431 }
2432 }
2433
2434 *rule_nr = 50;
2435 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2436 if (self == peer)
2437 return -1;
2438
2439 *rule_nr = 51;
2440 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2441 if (self == peer) {
2442 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2443 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2444 if (self == peer) {
2445 /* The last P_SYNC_UUID did not get though. Undo the last start of
2446 resync as sync source modifications of the peer's UUIDs. */
2447
2448 if (mdev->agreed_pro_version < 91)
2449 return -1001;
2450
2451 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2452 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2453 return -1;
2454 }
2455 }
2456
2457 *rule_nr = 60;
2458 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2459 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2460 peer = mdev->p_uuid[i] & ~((u64)1);
2461 if (self == peer)
2462 return -2;
2463 }
2464
2465 *rule_nr = 70;
2466 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2467 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2468 if (self == peer)
2469 return 1;
2470
2471 *rule_nr = 71;
2472 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2473 if (self == peer) {
2474 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2475 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2476 if (self == peer) {
2477 /* The last P_SYNC_UUID did not get though. Undo the last start of
2478 resync as sync source modifications of our UUIDs. */
2479
2480 if (mdev->agreed_pro_version < 91)
2481 return -1001;
2482
2483 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2484 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2485
2486 dev_info(DEV, "Undid last start of resync:\n");
2487
2488 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2489 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2490
2491 return 1;
2492 }
2493 }
2494
2495
2496 *rule_nr = 80;
d8c2a36b 2497 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2498 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2499 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2500 if (self == peer)
2501 return 2;
2502 }
2503
2504 *rule_nr = 90;
2505 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2506 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2507 if (self == peer && self != ((u64)0))
2508 return 100;
2509
2510 *rule_nr = 100;
2511 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2512 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2513 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2514 peer = mdev->p_uuid[j] & ~((u64)1);
2515 if (self == peer)
2516 return -100;
2517 }
2518 }
2519
2520 return -1000;
2521}
2522
2523/* drbd_sync_handshake() returns the new conn state on success, or
2524 CONN_MASK (-1) on failure.
2525 */
2526static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2527 enum drbd_disk_state peer_disk) __must_hold(local)
2528{
2529 int hg, rule_nr;
2530 enum drbd_conns rv = C_MASK;
2531 enum drbd_disk_state mydisk;
2532
2533 mydisk = mdev->state.disk;
2534 if (mydisk == D_NEGOTIATING)
2535 mydisk = mdev->new_state_tmp.disk;
2536
2537 dev_info(DEV, "drbd_sync_handshake:\n");
2538 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2539 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2540 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2541
2542 hg = drbd_uuid_compare(mdev, &rule_nr);
2543
2544 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2545
2546 if (hg == -1000) {
2547 dev_alert(DEV, "Unrelated data, aborting!\n");
2548 return C_MASK;
2549 }
2550 if (hg == -1001) {
2551 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2552 return C_MASK;
2553 }
2554
2555 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2556 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2557 int f = (hg == -100) || abs(hg) == 2;
2558 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2559 if (f)
2560 hg = hg*2;
2561 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2562 hg > 0 ? "source" : "target");
2563 }
2564
3a11a487
AG
2565 if (abs(hg) == 100)
2566 drbd_khelper(mdev, "initial-split-brain");
2567
b411b363
PR
2568 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2569 int pcount = (mdev->state.role == R_PRIMARY)
2570 + (peer_role == R_PRIMARY);
2571 int forced = (hg == -100);
2572
2573 switch (pcount) {
2574 case 0:
2575 hg = drbd_asb_recover_0p(mdev);
2576 break;
2577 case 1:
2578 hg = drbd_asb_recover_1p(mdev);
2579 break;
2580 case 2:
2581 hg = drbd_asb_recover_2p(mdev);
2582 break;
2583 }
2584 if (abs(hg) < 100) {
2585 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2586 "automatically solved. Sync from %s node\n",
2587 pcount, (hg < 0) ? "peer" : "this");
2588 if (forced) {
2589 dev_warn(DEV, "Doing a full sync, since"
2590 " UUIDs where ambiguous.\n");
2591 hg = hg*2;
2592 }
2593 }
2594 }
2595
2596 if (hg == -100) {
2597 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2598 hg = -1;
2599 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2600 hg = 1;
2601
2602 if (abs(hg) < 100)
2603 dev_warn(DEV, "Split-Brain detected, manually solved. "
2604 "Sync from %s node\n",
2605 (hg < 0) ? "peer" : "this");
2606 }
2607
2608 if (hg == -100) {
580b9767
LE
2609 /* FIXME this log message is not correct if we end up here
2610 * after an attempted attach on a diskless node.
2611 * We just refuse to attach -- well, we drop the "connection"
2612 * to that disk, in a way... */
3a11a487 2613 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2614 drbd_khelper(mdev, "split-brain");
2615 return C_MASK;
2616 }
2617
2618 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2619 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2620 return C_MASK;
2621 }
2622
2623 if (hg < 0 && /* by intention we do not use mydisk here. */
2624 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2625 switch (mdev->net_conf->rr_conflict) {
2626 case ASB_CALL_HELPER:
2627 drbd_khelper(mdev, "pri-lost");
2628 /* fall through */
2629 case ASB_DISCONNECT:
2630 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2631 return C_MASK;
2632 case ASB_VIOLENTLY:
2633 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2634 "assumption\n");
2635 }
2636 }
2637
cf14c2e9
PR
2638 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2639 if (hg == 0)
2640 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2641 else
2642 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2643 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2644 abs(hg) >= 2 ? "full" : "bit-map based");
2645 return C_MASK;
2646 }
2647
b411b363
PR
2648 if (abs(hg) >= 2) {
2649 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2650 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2651 return C_MASK;
2652 }
2653
2654 if (hg > 0) { /* become sync source. */
2655 rv = C_WF_BITMAP_S;
2656 } else if (hg < 0) { /* become sync target */
2657 rv = C_WF_BITMAP_T;
2658 } else {
2659 rv = C_CONNECTED;
2660 if (drbd_bm_total_weight(mdev)) {
2661 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2662 drbd_bm_total_weight(mdev));
2663 }
2664 }
2665
2666 return rv;
2667}
2668
2669/* returns 1 if invalid */
2670static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2671{
2672 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2673 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2674 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2675 return 0;
2676
2677 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2678 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2679 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2680 return 1;
2681
2682 /* everything else is valid if they are equal on both sides. */
2683 if (peer == self)
2684 return 0;
2685
2686 /* everything es is invalid. */
2687 return 1;
2688}
2689
2690static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2691{
2692 struct p_protocol *p = (struct p_protocol *)h;
2693 int header_size, data_size;
2694 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2695 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2696 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2697
2698 header_size = sizeof(*p) - sizeof(*h);
2699 data_size = h->length - header_size;
2700
2701 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2702 return FALSE;
2703
2704 p_proto = be32_to_cpu(p->protocol);
2705 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2706 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2707 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2708 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2709 cf = be32_to_cpu(p->conn_flags);
2710 p_want_lose = cf & CF_WANT_LOSE;
2711
2712 clear_bit(CONN_DRY_RUN, &mdev->flags);
2713
2714 if (cf & CF_DRY_RUN)
2715 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363
PR
2716
2717 if (p_proto != mdev->net_conf->wire_protocol) {
2718 dev_err(DEV, "incompatible communication protocols\n");
2719 goto disconnect;
2720 }
2721
2722 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2723 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2724 goto disconnect;
2725 }
2726
2727 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2728 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2729 goto disconnect;
2730 }
2731
2732 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2733 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2734 goto disconnect;
2735 }
2736
2737 if (p_want_lose && mdev->net_conf->want_lose) {
2738 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2739 goto disconnect;
2740 }
2741
2742 if (p_two_primaries != mdev->net_conf->two_primaries) {
2743 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2744 goto disconnect;
2745 }
2746
2747 if (mdev->agreed_pro_version >= 87) {
2748 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2749
2750 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2751 return FALSE;
2752
2753 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2754 if (strcmp(p_integrity_alg, my_alg)) {
2755 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2756 goto disconnect;
2757 }
2758 dev_info(DEV, "data-integrity-alg: %s\n",
2759 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2760 }
2761
2762 return TRUE;
2763
2764disconnect:
2765 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2766 return FALSE;
2767}
2768
2769/* helper function
2770 * input: alg name, feature name
2771 * return: NULL (alg name was "")
2772 * ERR_PTR(error) if something goes wrong
2773 * or the crypto hash ptr, if it worked out ok. */
2774struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2775 const char *alg, const char *name)
2776{
2777 struct crypto_hash *tfm;
2778
2779 if (!alg[0])
2780 return NULL;
2781
2782 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2783 if (IS_ERR(tfm)) {
2784 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2785 alg, name, PTR_ERR(tfm));
2786 return tfm;
2787 }
2788 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2789 crypto_free_hash(tfm);
2790 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2791 return ERR_PTR(-EINVAL);
2792 }
2793 return tfm;
2794}
2795
2796static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2797{
2798 int ok = TRUE;
2799 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2800 unsigned int header_size, data_size, exp_max_sz;
2801 struct crypto_hash *verify_tfm = NULL;
2802 struct crypto_hash *csums_tfm = NULL;
2803 const int apv = mdev->agreed_pro_version;
2804
2805 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2806 : apv == 88 ? sizeof(struct p_rs_param)
2807 + SHARED_SECRET_MAX
2808 : /* 89 */ sizeof(struct p_rs_param_89);
2809
2810 if (h->length > exp_max_sz) {
2811 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2812 h->length, exp_max_sz);
2813 return FALSE;
2814 }
2815
2816 if (apv <= 88) {
2817 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2818 data_size = h->length - header_size;
2819 } else /* apv >= 89 */ {
2820 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2821 data_size = h->length - header_size;
2822 D_ASSERT(data_size == 0);
2823 }
2824
2825 /* initialize verify_alg and csums_alg */
2826 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2827
2828 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2829 return FALSE;
2830
2831 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2832
2833 if (apv >= 88) {
2834 if (apv == 88) {
2835 if (data_size > SHARED_SECRET_MAX) {
2836 dev_err(DEV, "verify-alg too long, "
2837 "peer wants %u, accepting only %u byte\n",
2838 data_size, SHARED_SECRET_MAX);
2839 return FALSE;
2840 }
2841
2842 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2843 return FALSE;
2844
2845 /* we expect NUL terminated string */
2846 /* but just in case someone tries to be evil */
2847 D_ASSERT(p->verify_alg[data_size-1] == 0);
2848 p->verify_alg[data_size-1] = 0;
2849
2850 } else /* apv >= 89 */ {
2851 /* we still expect NUL terminated strings */
2852 /* but just in case someone tries to be evil */
2853 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2854 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2855 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2856 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2857 }
2858
2859 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2860 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2861 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2862 mdev->sync_conf.verify_alg, p->verify_alg);
2863 goto disconnect;
2864 }
2865 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2866 p->verify_alg, "verify-alg");
2867 if (IS_ERR(verify_tfm)) {
2868 verify_tfm = NULL;
2869 goto disconnect;
2870 }
2871 }
2872
2873 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2874 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2875 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2876 mdev->sync_conf.csums_alg, p->csums_alg);
2877 goto disconnect;
2878 }
2879 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2880 p->csums_alg, "csums-alg");
2881 if (IS_ERR(csums_tfm)) {
2882 csums_tfm = NULL;
2883 goto disconnect;
2884 }
2885 }
2886
2887
2888 spin_lock(&mdev->peer_seq_lock);
2889 /* lock against drbd_nl_syncer_conf() */
2890 if (verify_tfm) {
2891 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2892 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2893 crypto_free_hash(mdev->verify_tfm);
2894 mdev->verify_tfm = verify_tfm;
2895 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2896 }
2897 if (csums_tfm) {
2898 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2899 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2900 crypto_free_hash(mdev->csums_tfm);
2901 mdev->csums_tfm = csums_tfm;
2902 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2903 }
2904 spin_unlock(&mdev->peer_seq_lock);
2905 }
2906
2907 return ok;
2908disconnect:
2909 /* just for completeness: actually not needed,
2910 * as this is not reached if csums_tfm was ok. */
2911 crypto_free_hash(csums_tfm);
2912 /* but free the verify_tfm again, if csums_tfm did not work out */
2913 crypto_free_hash(verify_tfm);
2914 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2915 return FALSE;
2916}
2917
2918static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2919{
2920 /* sorry, we currently have no working implementation
2921 * of distributed TCQ */
2922}
2923
2924/* warn if the arguments differ by more than 12.5% */
2925static void warn_if_differ_considerably(struct drbd_conf *mdev,
2926 const char *s, sector_t a, sector_t b)
2927{
2928 sector_t d;
2929 if (a == 0 || b == 0)
2930 return;
2931 d = (a > b) ? (a - b) : (b - a);
2932 if (d > (a>>3) || d > (b>>3))
2933 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2934 (unsigned long long)a, (unsigned long long)b);
2935}
2936
2937static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2938{
2939 struct p_sizes *p = (struct p_sizes *)h;
2940 enum determine_dev_size dd = unchanged;
2941 unsigned int max_seg_s;
2942 sector_t p_size, p_usize, my_usize;
2943 int ldsc = 0; /* local disk size changed */
e89b591c 2944 enum dds_flags ddsf;
b411b363
PR
2945
2946 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2947 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2948 return FALSE;
2949
2950 p_size = be64_to_cpu(p->d_size);
2951 p_usize = be64_to_cpu(p->u_size);
2952
2953 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2954 dev_err(DEV, "some backing storage is needed\n");
2955 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2956 return FALSE;
2957 }
2958
2959 /* just store the peer's disk size for now.
2960 * we still need to figure out whether we accept that. */
2961 mdev->p_size = p_size;
2962
2963#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2964 if (get_ldev(mdev)) {
2965 warn_if_differ_considerably(mdev, "lower level device sizes",
2966 p_size, drbd_get_max_capacity(mdev->ldev));
2967 warn_if_differ_considerably(mdev, "user requested size",
2968 p_usize, mdev->ldev->dc.disk_size);
2969
2970 /* if this is the first connect, or an otherwise expected
2971 * param exchange, choose the minimum */
2972 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2973 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2974 p_usize);
2975
2976 my_usize = mdev->ldev->dc.disk_size;
2977
2978 if (mdev->ldev->dc.disk_size != p_usize) {
2979 mdev->ldev->dc.disk_size = p_usize;
2980 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2981 (unsigned long)mdev->ldev->dc.disk_size);
2982 }
2983
2984 /* Never shrink a device with usable data during connect.
2985 But allow online shrinking if we are connected. */
a393db6f 2986 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
2987 drbd_get_capacity(mdev->this_bdev) &&
2988 mdev->state.disk >= D_OUTDATED &&
2989 mdev->state.conn < C_CONNECTED) {
2990 dev_err(DEV, "The peer's disk size is too small!\n");
2991 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2992 mdev->ldev->dc.disk_size = my_usize;
2993 put_ldev(mdev);
2994 return FALSE;
2995 }
2996 put_ldev(mdev);
2997 }
2998#undef min_not_zero
2999
e89b591c 3000 ddsf = be16_to_cpu(p->dds_flags);
b411b363 3001 if (get_ldev(mdev)) {
e89b591c 3002 dd = drbd_determin_dev_size(mdev, ddsf);
b411b363
PR
3003 put_ldev(mdev);
3004 if (dd == dev_size_error)
3005 return FALSE;
3006 drbd_md_sync(mdev);
3007 } else {
3008 /* I am diskless, need to accept the peer's size. */
3009 drbd_set_my_capacity(mdev, p_size);
3010 }
3011
b411b363
PR
3012 if (get_ldev(mdev)) {
3013 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3014 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3015 ldsc = 1;
3016 }
3017
a1c88d0d
LE
3018 if (mdev->agreed_pro_version < 94)
3019 max_seg_s = be32_to_cpu(p->max_segment_size);
3020 else /* drbd 8.3.8 onwards */
3021 max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3022
b411b363
PR
3023 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3024 drbd_setup_queue_param(mdev, max_seg_s);
3025
e89b591c 3026 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
b411b363
PR
3027 put_ldev(mdev);
3028 }
3029
3030 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3031 if (be64_to_cpu(p->c_size) !=
3032 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3033 /* we have different sizes, probably peer
3034 * needs to know my new size... */
e89b591c 3035 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
3036 }
3037 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3038 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3039 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
3040 mdev->state.disk >= D_INCONSISTENT) {
3041 if (ddsf & DDSF_NO_RESYNC)
3042 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3043 else
3044 resync_after_online_grow(mdev);
3045 } else
b411b363
PR
3046 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3047 }
3048 }
3049
3050 return TRUE;
3051}
3052
3053static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3054{
3055 struct p_uuids *p = (struct p_uuids *)h;
3056 u64 *p_uuid;
3057 int i;
3058
3059 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3060 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3061 return FALSE;
3062
3063 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3064
3065 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3066 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3067
3068 kfree(mdev->p_uuid);
3069 mdev->p_uuid = p_uuid;
3070
3071 if (mdev->state.conn < C_CONNECTED &&
3072 mdev->state.disk < D_INCONSISTENT &&
3073 mdev->state.role == R_PRIMARY &&
3074 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3075 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3076 (unsigned long long)mdev->ed_uuid);
3077 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3078 return FALSE;
3079 }
3080
3081 if (get_ldev(mdev)) {
3082 int skip_initial_sync =
3083 mdev->state.conn == C_CONNECTED &&
3084 mdev->agreed_pro_version >= 90 &&
3085 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3086 (p_uuid[UI_FLAGS] & 8);
3087 if (skip_initial_sync) {
3088 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3089 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3090 "clear_n_write from receive_uuids");
3091 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3092 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3093 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3094 CS_VERBOSE, NULL);
3095 drbd_md_sync(mdev);
3096 }
3097 put_ldev(mdev);
3098 }
3099
3100 /* Before we test for the disk state, we should wait until an eventually
3101 ongoing cluster wide state change is finished. That is important if
3102 we are primary and are detaching from our disk. We need to see the
3103 new disk state... */
3104 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3105 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3106 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3107
3108 return TRUE;
3109}
3110
3111/**
3112 * convert_state() - Converts the peer's view of the cluster state to our point of view
3113 * @ps: The state as seen by the peer.
3114 */
3115static union drbd_state convert_state(union drbd_state ps)
3116{
3117 union drbd_state ms;
3118
3119 static enum drbd_conns c_tab[] = {
3120 [C_CONNECTED] = C_CONNECTED,
3121
3122 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3123 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3124 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3125 [C_VERIFY_S] = C_VERIFY_T,
3126 [C_MASK] = C_MASK,
3127 };
3128
3129 ms.i = ps.i;
3130
3131 ms.conn = c_tab[ps.conn];
3132 ms.peer = ps.role;
3133 ms.role = ps.peer;
3134 ms.pdsk = ps.disk;
3135 ms.disk = ps.pdsk;
3136 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3137
3138 return ms;
3139}
3140
3141static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3142{
3143 struct p_req_state *p = (struct p_req_state *)h;
3144 union drbd_state mask, val;
3145 int rv;
3146
3147 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3148 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3149 return FALSE;
3150
3151 mask.i = be32_to_cpu(p->mask);
3152 val.i = be32_to_cpu(p->val);
3153
3154 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3155 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3156 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3157 return TRUE;
3158 }
3159
3160 mask = convert_state(mask);
3161 val = convert_state(val);
3162
3163 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3164
3165 drbd_send_sr_reply(mdev, rv);
3166 drbd_md_sync(mdev);
3167
3168 return TRUE;
3169}
3170
3171static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3172{
3173 struct p_state *p = (struct p_state *)h;
3174 enum drbd_conns nconn, oconn;
3175 union drbd_state ns, peer_state;
3176 enum drbd_disk_state real_peer_disk;
3177 int rv;
3178
3179 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3180 return FALSE;
3181
3182 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3183 return FALSE;
3184
3185 peer_state.i = be32_to_cpu(p->state);
3186
3187 real_peer_disk = peer_state.disk;
3188 if (peer_state.disk == D_NEGOTIATING) {
3189 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3190 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3191 }
3192
3193 spin_lock_irq(&mdev->req_lock);
3194 retry:
3195 oconn = nconn = mdev->state.conn;
3196 spin_unlock_irq(&mdev->req_lock);
3197
3198 if (nconn == C_WF_REPORT_PARAMS)
3199 nconn = C_CONNECTED;
3200
3201 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3202 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3203 int cr; /* consider resync */
3204
3205 /* if we established a new connection */
3206 cr = (oconn < C_CONNECTED);
3207 /* if we had an established connection
3208 * and one of the nodes newly attaches a disk */
3209 cr |= (oconn == C_CONNECTED &&
3210 (peer_state.disk == D_NEGOTIATING ||
3211 mdev->state.disk == D_NEGOTIATING));
3212 /* if we have both been inconsistent, and the peer has been
3213 * forced to be UpToDate with --overwrite-data */
3214 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3215 /* if we had been plain connected, and the admin requested to
3216 * start a sync by "invalidate" or "invalidate-remote" */
3217 cr |= (oconn == C_CONNECTED &&
3218 (peer_state.conn >= C_STARTING_SYNC_S &&
3219 peer_state.conn <= C_WF_BITMAP_T));
3220
3221 if (cr)
3222 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3223
3224 put_ldev(mdev);
3225 if (nconn == C_MASK) {
580b9767 3226 nconn = C_CONNECTED;
b411b363
PR
3227 if (mdev->state.disk == D_NEGOTIATING) {
3228 drbd_force_state(mdev, NS(disk, D_DISKLESS));
b411b363
PR
3229 } else if (peer_state.disk == D_NEGOTIATING) {
3230 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3231 peer_state.disk = D_DISKLESS;
580b9767 3232 real_peer_disk = D_DISKLESS;
b411b363 3233 } else {
cf14c2e9
PR
3234 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3235 return FALSE;
b411b363
PR
3236 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3237 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3238 return FALSE;
3239 }
3240 }
3241 }
3242
3243 spin_lock_irq(&mdev->req_lock);
3244 if (mdev->state.conn != oconn)
3245 goto retry;
3246 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3247 ns.i = mdev->state.i;
3248 ns.conn = nconn;
3249 ns.peer = peer_state.role;
3250 ns.pdsk = real_peer_disk;
3251 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3252 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3253 ns.disk = mdev->new_state_tmp.disk;
3254
3255 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3256 ns = mdev->state;
3257 spin_unlock_irq(&mdev->req_lock);
3258
3259 if (rv < SS_SUCCESS) {
3260 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3261 return FALSE;
3262 }
3263
3264 if (oconn > C_WF_REPORT_PARAMS) {
3265 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3266 peer_state.disk != D_NEGOTIATING ) {
3267 /* we want resync, peer has not yet decided to sync... */
3268 /* Nowadays only used when forcing a node into primary role and
3269 setting its disk to UpToDate with that */
3270 drbd_send_uuids(mdev);
3271 drbd_send_state(mdev);
3272 }
3273 }
3274
3275 mdev->net_conf->want_lose = 0;
3276
3277 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3278
3279 return TRUE;
3280}
3281
3282static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3283{
3284 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3285
3286 wait_event(mdev->misc_wait,
3287 mdev->state.conn == C_WF_SYNC_UUID ||
3288 mdev->state.conn < C_CONNECTED ||
3289 mdev->state.disk < D_NEGOTIATING);
3290
3291 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3292
3293 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3294 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3295 return FALSE;
3296
3297 /* Here the _drbd_uuid_ functions are right, current should
3298 _not_ be rotated into the history */
3299 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3300 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3301 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3302
3303 drbd_start_resync(mdev, C_SYNC_TARGET);
3304
3305 put_ldev(mdev);
3306 } else
3307 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3308
3309 return TRUE;
3310}
3311
3312enum receive_bitmap_ret { OK, DONE, FAILED };
3313
3314static enum receive_bitmap_ret
3315receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3316 unsigned long *buffer, struct bm_xfer_ctx *c)
3317{
3318 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3319 unsigned want = num_words * sizeof(long);
3320
3321 if (want != h->length) {
3322 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3323 return FAILED;
3324 }
3325 if (want == 0)
3326 return DONE;
3327 if (drbd_recv(mdev, buffer, want) != want)
3328 return FAILED;
3329
3330 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3331
3332 c->word_offset += num_words;
3333 c->bit_offset = c->word_offset * BITS_PER_LONG;
3334 if (c->bit_offset > c->bm_bits)
3335 c->bit_offset = c->bm_bits;
3336
3337 return OK;
3338}
3339
3340static enum receive_bitmap_ret
3341recv_bm_rle_bits(struct drbd_conf *mdev,
3342 struct p_compressed_bm *p,
3343 struct bm_xfer_ctx *c)
3344{
3345 struct bitstream bs;
3346 u64 look_ahead;
3347 u64 rl;
3348 u64 tmp;
3349 unsigned long s = c->bit_offset;
3350 unsigned long e;
3351 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3352 int toggle = DCBP_get_start(p);
3353 int have;
3354 int bits;
3355
3356 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3357
3358 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3359 if (bits < 0)
3360 return FAILED;
3361
3362 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3363 bits = vli_decode_bits(&rl, look_ahead);
3364 if (bits <= 0)
3365 return FAILED;
3366
3367 if (toggle) {
3368 e = s + rl -1;
3369 if (e >= c->bm_bits) {
3370 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3371 return FAILED;
3372 }
3373 _drbd_bm_set_bits(mdev, s, e);
3374 }
3375
3376 if (have < bits) {
3377 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3378 have, bits, look_ahead,
3379 (unsigned int)(bs.cur.b - p->code),
3380 (unsigned int)bs.buf_len);
3381 return FAILED;
3382 }
3383 look_ahead >>= bits;
3384 have -= bits;
3385
3386 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3387 if (bits < 0)
3388 return FAILED;
3389 look_ahead |= tmp << have;
3390 have += bits;
3391 }
3392
3393 c->bit_offset = s;
3394 bm_xfer_ctx_bit_to_word_offset(c);
3395
3396 return (s == c->bm_bits) ? DONE : OK;
3397}
3398
3399static enum receive_bitmap_ret
3400decode_bitmap_c(struct drbd_conf *mdev,
3401 struct p_compressed_bm *p,
3402 struct bm_xfer_ctx *c)
3403{
3404 if (DCBP_get_code(p) == RLE_VLI_Bits)
3405 return recv_bm_rle_bits(mdev, p, c);
3406
3407 /* other variants had been implemented for evaluation,
3408 * but have been dropped as this one turned out to be "best"
3409 * during all our tests. */
3410
3411 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3412 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3413 return FAILED;
3414}
3415
3416void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3417 const char *direction, struct bm_xfer_ctx *c)
3418{
3419 /* what would it take to transfer it "plaintext" */
3420 unsigned plain = sizeof(struct p_header) *
3421 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3422 + c->bm_words * sizeof(long);
3423 unsigned total = c->bytes[0] + c->bytes[1];
3424 unsigned r;
3425
3426 /* total can not be zero. but just in case: */
3427 if (total == 0)
3428 return;
3429
3430 /* don't report if not compressed */
3431 if (total >= plain)
3432 return;
3433
3434 /* total < plain. check for overflow, still */
3435 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3436 : (1000 * total / plain);
3437
3438 if (r > 1000)
3439 r = 1000;
3440
3441 r = 1000 - r;
3442 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3443 "total %u; compression: %u.%u%%\n",
3444 direction,
3445 c->bytes[1], c->packets[1],
3446 c->bytes[0], c->packets[0],
3447 total, r/10, r % 10);
3448}
3449
3450/* Since we are processing the bitfield from lower addresses to higher,
3451 it does not matter if the process it in 32 bit chunks or 64 bit
3452 chunks as long as it is little endian. (Understand it as byte stream,
3453 beginning with the lowest byte...) If we would use big endian
3454 we would need to process it from the highest address to the lowest,
3455 in order to be agnostic to the 32 vs 64 bits issue.
3456
3457 returns 0 on failure, 1 if we successfully received it. */
3458static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3459{
3460 struct bm_xfer_ctx c;
3461 void *buffer;
3462 enum receive_bitmap_ret ret;
3463 int ok = FALSE;
3464
3465 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3466
3467 drbd_bm_lock(mdev, "receive bitmap");
3468
3469 /* maybe we should use some per thread scratch page,
3470 * and allocate that during initial device creation? */
3471 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3472 if (!buffer) {
3473 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3474 goto out;
3475 }
3476
3477 c = (struct bm_xfer_ctx) {
3478 .bm_bits = drbd_bm_bits(mdev),
3479 .bm_words = drbd_bm_words(mdev),
3480 };
3481
3482 do {
3483 if (h->command == P_BITMAP) {
3484 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3485 } else if (h->command == P_COMPRESSED_BITMAP) {
3486 /* MAYBE: sanity check that we speak proto >= 90,
3487 * and the feature is enabled! */
3488 struct p_compressed_bm *p;
3489
3490 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3491 dev_err(DEV, "ReportCBitmap packet too large\n");
3492 goto out;
3493 }
3494 /* use the page buff */
3495 p = buffer;
3496 memcpy(p, h, sizeof(*h));
3497 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3498 goto out;
3499 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3500 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3501 return FAILED;
3502 }
3503 ret = decode_bitmap_c(mdev, p, &c);
3504 } else {
3505 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3506 goto out;
3507 }
3508
3509 c.packets[h->command == P_BITMAP]++;
3510 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3511
3512 if (ret != OK)
3513 break;
3514
3515 if (!drbd_recv_header(mdev, h))
3516 goto out;
3517 } while (ret == OK);
3518 if (ret == FAILED)
3519 goto out;
3520
3521 INFO_bm_xfer_stats(mdev, "receive", &c);
3522
3523 if (mdev->state.conn == C_WF_BITMAP_T) {
3524 ok = !drbd_send_bitmap(mdev);
3525 if (!ok)
3526 goto out;
3527 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3528 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3529 D_ASSERT(ok == SS_SUCCESS);
3530 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3531 /* admin may have requested C_DISCONNECTING,
3532 * other threads may have noticed network errors */
3533 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3534 drbd_conn_str(mdev->state.conn));
3535 }
3536
3537 ok = TRUE;
3538 out:
3539 drbd_bm_unlock(mdev);
3540 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3541 drbd_start_resync(mdev, C_SYNC_SOURCE);
3542 free_page((unsigned long) buffer);
3543 return ok;
3544}
3545
3546static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3547{
3548 /* TODO zero copy sink :) */
3549 static char sink[128];
3550 int size, want, r;
3551
3552 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3553 h->command, h->length);
3554
3555 size = h->length;
3556 while (size > 0) {
3557 want = min_t(int, size, sizeof(sink));
3558 r = drbd_recv(mdev, sink, want);
3559 ERR_IF(r <= 0) break;
3560 size -= r;
3561 }
3562 return size == 0;
3563}
3564
3565static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3566{
3567 if (mdev->state.disk >= D_INCONSISTENT)
3568 drbd_kick_lo(mdev);
3569
3570 /* Make sure we've acked all the TCP data associated
3571 * with the data requests being unplugged */
3572 drbd_tcp_quickack(mdev->data.socket);
3573
3574 return TRUE;
3575}
3576
0ced55a3
PR
3577static void timeval_sub_us(struct timeval* tv, unsigned int us)
3578{
3579 tv->tv_sec -= us / 1000000;
3580 us = us % 1000000;
3581 if (tv->tv_usec > us) {
3582 tv->tv_usec += 1000000;
3583 tv->tv_sec--;
3584 }
3585 tv->tv_usec -= us;
3586}
3587
3588static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3589{
3590 struct delay_probe *dp;
3591 struct list_head *le;
3592 struct timeval now;
3593 int seq_num;
3594 int offset;
3595 int data_delay;
3596
3597 seq_num = be32_to_cpu(p->seq_num);
3598 offset = be32_to_cpu(p->offset);
3599
3600 spin_lock(&mdev->peer_seq_lock);
3601 if (!list_empty(&mdev->delay_probes)) {
3602 if (from == USE_DATA_SOCKET)
3603 le = mdev->delay_probes.next;
3604 else
3605 le = mdev->delay_probes.prev;
3606
3607 dp = list_entry(le, struct delay_probe, list);
3608
3609 if (dp->seq_num == seq_num) {
3610 list_del(le);
3611 spin_unlock(&mdev->peer_seq_lock);
3612 do_gettimeofday(&now);
3613 timeval_sub_us(&now, offset);
3614 data_delay =
3615 now.tv_usec - dp->time.tv_usec +
3616 (now.tv_sec - dp->time.tv_sec) * 1000000;
3617
3618 if (data_delay > 0)
3619 mdev->data_delay = data_delay;
3620
3621 kfree(dp);
3622 return;
3623 }
3624
3625 if (dp->seq_num > seq_num) {
3626 spin_unlock(&mdev->peer_seq_lock);
3627 dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3628 return; /* Do not alloca a struct delay_probe.... */
3629 }
3630 }
3631 spin_unlock(&mdev->peer_seq_lock);
3632
3633 dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3634 if (!dp) {
3635 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3636 return;
3637 }
3638
3639 dp->seq_num = seq_num;
3640 do_gettimeofday(&dp->time);
3641 timeval_sub_us(&dp->time, offset);
3642
3643 spin_lock(&mdev->peer_seq_lock);
3644 if (from == USE_DATA_SOCKET)
3645 list_add(&dp->list, &mdev->delay_probes);
3646 else
3647 list_add_tail(&dp->list, &mdev->delay_probes);
3648 spin_unlock(&mdev->peer_seq_lock);
3649}
3650
3651static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3652{
3653 struct p_delay_probe *p = (struct p_delay_probe *)h;
3654
3655 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3656 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3657 return FALSE;
3658
3659 got_delay_probe(mdev, USE_DATA_SOCKET, p);
3660 return TRUE;
3661}
3662
b411b363
PR
3663typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3664
3665static drbd_cmd_handler_f drbd_default_handler[] = {
3666 [P_DATA] = receive_Data,
3667 [P_DATA_REPLY] = receive_DataReply,
3668 [P_RS_DATA_REPLY] = receive_RSDataReply,
3669 [P_BARRIER] = receive_Barrier,
3670 [P_BITMAP] = receive_bitmap,
3671 [P_COMPRESSED_BITMAP] = receive_bitmap,
3672 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3673 [P_DATA_REQUEST] = receive_DataRequest,
3674 [P_RS_DATA_REQUEST] = receive_DataRequest,
3675 [P_SYNC_PARAM] = receive_SyncParam,
3676 [P_SYNC_PARAM89] = receive_SyncParam,
3677 [P_PROTOCOL] = receive_protocol,
3678 [P_UUIDS] = receive_uuids,
3679 [P_SIZES] = receive_sizes,
3680 [P_STATE] = receive_state,
3681 [P_STATE_CHG_REQ] = receive_req_state,
3682 [P_SYNC_UUID] = receive_sync_uuid,
3683 [P_OV_REQUEST] = receive_DataRequest,
3684 [P_OV_REPLY] = receive_DataRequest,
3685 [P_CSUM_RS_REQUEST] = receive_DataRequest,
0ced55a3 3686 [P_DELAY_PROBE] = receive_delay_probe,
b411b363
PR
3687 /* anything missing from this table is in
3688 * the asender_tbl, see get_asender_cmd */
3689 [P_MAX_CMD] = NULL,
3690};
3691
3692static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3693static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3694
3695static void drbdd(struct drbd_conf *mdev)
3696{
3697 drbd_cmd_handler_f handler;
3698 struct p_header *header = &mdev->data.rbuf.header;
3699
3700 while (get_t_state(&mdev->receiver) == Running) {
3701 drbd_thread_current_set_cpu(mdev);
0b33a916
LE
3702 if (!drbd_recv_header(mdev, header)) {
3703 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
b411b363 3704 break;
0b33a916 3705 }
b411b363
PR
3706
3707 if (header->command < P_MAX_CMD)
3708 handler = drbd_cmd_handler[header->command];
3709 else if (P_MAY_IGNORE < header->command
3710 && header->command < P_MAX_OPT_CMD)
3711 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3712 else if (header->command > P_MAX_OPT_CMD)
3713 handler = receive_skip;
3714 else
3715 handler = NULL;
3716
3717 if (unlikely(!handler)) {
3718 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3719 header->command, header->length);
3720 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3721 break;
3722 }
3723 if (unlikely(!handler(mdev, header))) {
3724 dev_err(DEV, "error receiving %s, l: %d!\n",
3725 cmdname(header->command), header->length);
3726 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3727 break;
3728 }
b411b363
PR
3729 }
3730}
3731
3732static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3733{
3734 struct hlist_head *slot;
3735 struct hlist_node *pos;
3736 struct hlist_node *tmp;
3737 struct drbd_request *req;
3738 int i;
3739
3740 /*
3741 * Application READ requests
3742 */
3743 spin_lock_irq(&mdev->req_lock);
3744 for (i = 0; i < APP_R_HSIZE; i++) {
3745 slot = mdev->app_reads_hash+i;
3746 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3747 /* it may (but should not any longer!)
3748 * be on the work queue; if that assert triggers,
3749 * we need to also grab the
3750 * spin_lock_irq(&mdev->data.work.q_lock);
3751 * and list_del_init here. */
3752 D_ASSERT(list_empty(&req->w.list));
3753 /* It would be nice to complete outside of spinlock.
3754 * But this is easier for now. */
3755 _req_mod(req, connection_lost_while_pending);
3756 }
3757 }
3758 for (i = 0; i < APP_R_HSIZE; i++)
3759 if (!hlist_empty(mdev->app_reads_hash+i))
3760 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3761 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3762
3763 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3764 spin_unlock_irq(&mdev->req_lock);
3765}
3766
3767void drbd_flush_workqueue(struct drbd_conf *mdev)
3768{
3769 struct drbd_wq_barrier barr;
3770
3771 barr.w.cb = w_prev_work_done;
3772 init_completion(&barr.done);
3773 drbd_queue_work(&mdev->data.work, &barr.w);
3774 wait_for_completion(&barr.done);
3775}
3776
3777static void drbd_disconnect(struct drbd_conf *mdev)
3778{
3779 enum drbd_fencing_p fp;
3780 union drbd_state os, ns;
3781 int rv = SS_UNKNOWN_ERROR;
3782 unsigned int i;
3783
3784 if (mdev->state.conn == C_STANDALONE)
3785 return;
3786 if (mdev->state.conn >= C_WF_CONNECTION)
3787 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3788 drbd_conn_str(mdev->state.conn));
3789
3790 /* asender does not clean up anything. it must not interfere, either */
3791 drbd_thread_stop(&mdev->asender);
b411b363 3792 drbd_free_sock(mdev);
b411b363
PR
3793
3794 spin_lock_irq(&mdev->req_lock);
3795 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3796 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3797 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3798 spin_unlock_irq(&mdev->req_lock);
3799
3800 /* We do not have data structures that would allow us to
3801 * get the rs_pending_cnt down to 0 again.
3802 * * On C_SYNC_TARGET we do not have any data structures describing
3803 * the pending RSDataRequest's we have sent.
3804 * * On C_SYNC_SOURCE there is no data structure that tracks
3805 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3806 * And no, it is not the sum of the reference counts in the
3807 * resync_LRU. The resync_LRU tracks the whole operation including
3808 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3809 * on the fly. */
3810 drbd_rs_cancel_all(mdev);
3811 mdev->rs_total = 0;
3812 mdev->rs_failed = 0;
3813 atomic_set(&mdev->rs_pending_cnt, 0);
3814 wake_up(&mdev->misc_wait);
3815
3816 /* make sure syncer is stopped and w_resume_next_sg queued */
3817 del_timer_sync(&mdev->resync_timer);
3818 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3819 resync_timer_fn((unsigned long)mdev);
3820
b411b363
PR
3821 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3822 * w_make_resync_request etc. which may still be on the worker queue
3823 * to be "canceled" */
3824 drbd_flush_workqueue(mdev);
3825
3826 /* This also does reclaim_net_ee(). If we do this too early, we might
3827 * miss some resync ee and pages.*/
3828 drbd_process_done_ee(mdev);
3829
3830 kfree(mdev->p_uuid);
3831 mdev->p_uuid = NULL;
3832
3833 if (!mdev->state.susp)
3834 tl_clear(mdev);
3835
3836 drbd_fail_pending_reads(mdev);
3837
3838 dev_info(DEV, "Connection closed\n");
3839
3840 drbd_md_sync(mdev);
3841
3842 fp = FP_DONT_CARE;
3843 if (get_ldev(mdev)) {
3844 fp = mdev->ldev->dc.fencing;
3845 put_ldev(mdev);
3846 }
3847
3848 if (mdev->state.role == R_PRIMARY) {
3849 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3850 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3851 drbd_request_state(mdev, NS(pdsk, nps));
3852 }
3853 }
3854
3855 spin_lock_irq(&mdev->req_lock);
3856 os = mdev->state;
3857 if (os.conn >= C_UNCONNECTED) {
3858 /* Do not restart in case we are C_DISCONNECTING */
3859 ns = os;
3860 ns.conn = C_UNCONNECTED;
3861 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3862 }
3863 spin_unlock_irq(&mdev->req_lock);
3864
3865 if (os.conn == C_DISCONNECTING) {
3866 struct hlist_head *h;
3867 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3868
3869 /* we must not free the tl_hash
3870 * while application io is still on the fly */
3871 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3872
3873 spin_lock_irq(&mdev->req_lock);
3874 /* paranoia code */
3875 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3876 if (h->first)
3877 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3878 (int)(h - mdev->ee_hash), h->first);
3879 kfree(mdev->ee_hash);
3880 mdev->ee_hash = NULL;
3881 mdev->ee_hash_s = 0;
3882
3883 /* paranoia code */
3884 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3885 if (h->first)
3886 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3887 (int)(h - mdev->tl_hash), h->first);
3888 kfree(mdev->tl_hash);
3889 mdev->tl_hash = NULL;
3890 mdev->tl_hash_s = 0;
3891 spin_unlock_irq(&mdev->req_lock);
3892
3893 crypto_free_hash(mdev->cram_hmac_tfm);
3894 mdev->cram_hmac_tfm = NULL;
3895
3896 kfree(mdev->net_conf);
3897 mdev->net_conf = NULL;
3898 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3899 }
3900
3901 /* tcp_close and release of sendpage pages can be deferred. I don't
3902 * want to use SO_LINGER, because apparently it can be deferred for
3903 * more than 20 seconds (longest time I checked).
3904 *
3905 * Actually we don't care for exactly when the network stack does its
3906 * put_page(), but release our reference on these pages right here.
3907 */
3908 i = drbd_release_ee(mdev, &mdev->net_ee);
3909 if (i)
3910 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3911 i = atomic_read(&mdev->pp_in_use);
3912 if (i)
45bb912b 3913 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
b411b363
PR
3914
3915 D_ASSERT(list_empty(&mdev->read_ee));
3916 D_ASSERT(list_empty(&mdev->active_ee));
3917 D_ASSERT(list_empty(&mdev->sync_ee));
3918 D_ASSERT(list_empty(&mdev->done_ee));
3919
3920 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3921 atomic_set(&mdev->current_epoch->epoch_size, 0);
3922 D_ASSERT(list_empty(&mdev->current_epoch->list));
3923}
3924
3925/*
3926 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3927 * we can agree on is stored in agreed_pro_version.
3928 *
3929 * feature flags and the reserved array should be enough room for future
3930 * enhancements of the handshake protocol, and possible plugins...
3931 *
3932 * for now, they are expected to be zero, but ignored.
3933 */
3934static int drbd_send_handshake(struct drbd_conf *mdev)
3935{
3936 /* ASSERT current == mdev->receiver ... */
3937 struct p_handshake *p = &mdev->data.sbuf.handshake;
3938 int ok;
3939
3940 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3941 dev_err(DEV, "interrupted during initial handshake\n");
3942 return 0; /* interrupted. not ok. */
3943 }
3944
3945 if (mdev->data.socket == NULL) {
3946 mutex_unlock(&mdev->data.mutex);
3947 return 0;
3948 }
3949
3950 memset(p, 0, sizeof(*p));
3951 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3952 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3953 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3954 (struct p_header *)p, sizeof(*p), 0 );
3955 mutex_unlock(&mdev->data.mutex);
3956 return ok;
3957}
3958
3959/*
3960 * return values:
3961 * 1 yes, we have a valid connection
3962 * 0 oops, did not work out, please try again
3963 * -1 peer talks different language,
3964 * no point in trying again, please go standalone.
3965 */
3966static int drbd_do_handshake(struct drbd_conf *mdev)
3967{
3968 /* ASSERT current == mdev->receiver ... */
3969 struct p_handshake *p = &mdev->data.rbuf.handshake;
3970 const int expect = sizeof(struct p_handshake)
3971 -sizeof(struct p_header);
3972 int rv;
3973
3974 rv = drbd_send_handshake(mdev);
3975 if (!rv)
3976 return 0;
3977
3978 rv = drbd_recv_header(mdev, &p->head);
3979 if (!rv)
3980 return 0;
3981
3982 if (p->head.command != P_HAND_SHAKE) {
3983 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3984 cmdname(p->head.command), p->head.command);
3985 return -1;
3986 }
3987
3988 if (p->head.length != expect) {
3989 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3990 expect, p->head.length);
3991 return -1;
3992 }
3993
3994 rv = drbd_recv(mdev, &p->head.payload, expect);
3995
3996 if (rv != expect) {
3997 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3998 return 0;
3999 }
4000
b411b363
PR
4001 p->protocol_min = be32_to_cpu(p->protocol_min);
4002 p->protocol_max = be32_to_cpu(p->protocol_max);
4003 if (p->protocol_max == 0)
4004 p->protocol_max = p->protocol_min;
4005
4006 if (PRO_VERSION_MAX < p->protocol_min ||
4007 PRO_VERSION_MIN > p->protocol_max)
4008 goto incompat;
4009
4010 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4011
4012 dev_info(DEV, "Handshake successful: "
4013 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4014
4015 return 1;
4016
4017 incompat:
4018 dev_err(DEV, "incompatible DRBD dialects: "
4019 "I support %d-%d, peer supports %d-%d\n",
4020 PRO_VERSION_MIN, PRO_VERSION_MAX,
4021 p->protocol_min, p->protocol_max);
4022 return -1;
4023}
4024
4025#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4026static int drbd_do_auth(struct drbd_conf *mdev)
4027{
4028 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4029 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 4030 return -1;
b411b363
PR
4031}
4032#else
4033#define CHALLENGE_LEN 64
b10d96cb
JT
4034
4035/* Return value:
4036 1 - auth succeeded,
4037 0 - failed, try again (network error),
4038 -1 - auth failed, don't try again.
4039*/
4040
b411b363
PR
4041static int drbd_do_auth(struct drbd_conf *mdev)
4042{
4043 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4044 struct scatterlist sg;
4045 char *response = NULL;
4046 char *right_response = NULL;
4047 char *peers_ch = NULL;
4048 struct p_header p;
4049 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4050 unsigned int resp_size;
4051 struct hash_desc desc;
4052 int rv;
4053
4054 desc.tfm = mdev->cram_hmac_tfm;
4055 desc.flags = 0;
4056
4057 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4058 (u8 *)mdev->net_conf->shared_secret, key_len);
4059 if (rv) {
4060 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 4061 rv = -1;
b411b363
PR
4062 goto fail;
4063 }
4064
4065 get_random_bytes(my_challenge, CHALLENGE_LEN);
4066
4067 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4068 if (!rv)
4069 goto fail;
4070
4071 rv = drbd_recv_header(mdev, &p);
4072 if (!rv)
4073 goto fail;
4074
4075 if (p.command != P_AUTH_CHALLENGE) {
4076 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4077 cmdname(p.command), p.command);
4078 rv = 0;
4079 goto fail;
4080 }
4081
4082 if (p.length > CHALLENGE_LEN*2) {
4083 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4084 rv = -1;
b411b363
PR
4085 goto fail;
4086 }
4087
4088 peers_ch = kmalloc(p.length, GFP_NOIO);
4089 if (peers_ch == NULL) {
4090 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4091 rv = -1;
b411b363
PR
4092 goto fail;
4093 }
4094
4095 rv = drbd_recv(mdev, peers_ch, p.length);
4096
4097 if (rv != p.length) {
4098 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4099 rv = 0;
4100 goto fail;
4101 }
4102
4103 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4104 response = kmalloc(resp_size, GFP_NOIO);
4105 if (response == NULL) {
4106 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4107 rv = -1;
b411b363
PR
4108 goto fail;
4109 }
4110
4111 sg_init_table(&sg, 1);
4112 sg_set_buf(&sg, peers_ch, p.length);
4113
4114 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4115 if (rv) {
4116 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4117 rv = -1;
b411b363
PR
4118 goto fail;
4119 }
4120
4121 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4122 if (!rv)
4123 goto fail;
4124
4125 rv = drbd_recv_header(mdev, &p);
4126 if (!rv)
4127 goto fail;
4128
4129 if (p.command != P_AUTH_RESPONSE) {
4130 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4131 cmdname(p.command), p.command);
4132 rv = 0;
4133 goto fail;
4134 }
4135
4136 if (p.length != resp_size) {
4137 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4138 rv = 0;
4139 goto fail;
4140 }
4141
4142 rv = drbd_recv(mdev, response , resp_size);
4143
4144 if (rv != resp_size) {
4145 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4146 rv = 0;
4147 goto fail;
4148 }
4149
4150 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4151 if (right_response == NULL) {
b411b363 4152 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4153 rv = -1;
b411b363
PR
4154 goto fail;
4155 }
4156
4157 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4158
4159 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4160 if (rv) {
4161 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4162 rv = -1;
b411b363
PR
4163 goto fail;
4164 }
4165
4166 rv = !memcmp(response, right_response, resp_size);
4167
4168 if (rv)
4169 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4170 resp_size, mdev->net_conf->cram_hmac_alg);
b10d96cb
JT
4171 else
4172 rv = -1;
b411b363
PR
4173
4174 fail:
4175 kfree(peers_ch);
4176 kfree(response);
4177 kfree(right_response);
4178
4179 return rv;
4180}
4181#endif
4182
4183int drbdd_init(struct drbd_thread *thi)
4184{
4185 struct drbd_conf *mdev = thi->mdev;
4186 unsigned int minor = mdev_to_minor(mdev);
4187 int h;
4188
4189 sprintf(current->comm, "drbd%d_receiver", minor);
4190
4191 dev_info(DEV, "receiver (re)started\n");
4192
4193 do {
4194 h = drbd_connect(mdev);
4195 if (h == 0) {
4196 drbd_disconnect(mdev);
4197 __set_current_state(TASK_INTERRUPTIBLE);
4198 schedule_timeout(HZ);
4199 }
4200 if (h == -1) {
4201 dev_warn(DEV, "Discarding network configuration.\n");
4202 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4203 }
4204 } while (h == 0);
4205
4206 if (h > 0) {
4207 if (get_net_conf(mdev)) {
4208 drbdd(mdev);
4209 put_net_conf(mdev);
4210 }
4211 }
4212
4213 drbd_disconnect(mdev);
4214
4215 dev_info(DEV, "receiver terminated\n");
4216 return 0;
4217}
4218
4219/* ********* acknowledge sender ******** */
4220
4221static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4222{
4223 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4224
4225 int retcode = be32_to_cpu(p->retcode);
4226
4227 if (retcode >= SS_SUCCESS) {
4228 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4229 } else {
4230 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4231 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4232 drbd_set_st_err_str(retcode), retcode);
4233 }
4234 wake_up(&mdev->state_wait);
4235
4236 return TRUE;
4237}
4238
4239static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4240{
4241 return drbd_send_ping_ack(mdev);
4242
4243}
4244
4245static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4246{
4247 /* restore idle timeout */
4248 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
309d1608
PR
4249 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4250 wake_up(&mdev->misc_wait);
b411b363
PR
4251
4252 return TRUE;
4253}
4254
4255static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4256{
4257 struct p_block_ack *p = (struct p_block_ack *)h;
4258 sector_t sector = be64_to_cpu(p->sector);
4259 int blksize = be32_to_cpu(p->blksize);
4260
4261 D_ASSERT(mdev->agreed_pro_version >= 89);
4262
4263 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4264
4265 drbd_rs_complete_io(mdev, sector);
4266 drbd_set_in_sync(mdev, sector, blksize);
4267 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4268 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4269 dec_rs_pending(mdev);
4270
4271 return TRUE;
4272}
4273
4274/* when we receive the ACK for a write request,
4275 * verify that we actually know about it */
4276static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4277 u64 id, sector_t sector)
4278{
4279 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4280 struct hlist_node *n;
4281 struct drbd_request *req;
4282
4283 hlist_for_each_entry(req, n, slot, colision) {
4284 if ((unsigned long)req == (unsigned long)id) {
4285 if (req->sector != sector) {
4286 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4287 "wrong sector (%llus versus %llus)\n", req,
4288 (unsigned long long)req->sector,
4289 (unsigned long long)sector);
4290 break;
4291 }
4292 return req;
4293 }
4294 }
4295 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4296 (void *)(unsigned long)id, (unsigned long long)sector);
4297 return NULL;
4298}
4299
4300typedef struct drbd_request *(req_validator_fn)
4301 (struct drbd_conf *mdev, u64 id, sector_t sector);
4302
4303static int validate_req_change_req_state(struct drbd_conf *mdev,
4304 u64 id, sector_t sector, req_validator_fn validator,
4305 const char *func, enum drbd_req_event what)
4306{
4307 struct drbd_request *req;
4308 struct bio_and_error m;
4309
4310 spin_lock_irq(&mdev->req_lock);
4311 req = validator(mdev, id, sector);
4312 if (unlikely(!req)) {
4313 spin_unlock_irq(&mdev->req_lock);
4314 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4315 return FALSE;
4316 }
4317 __req_mod(req, what, &m);
4318 spin_unlock_irq(&mdev->req_lock);
4319
4320 if (m.bio)
4321 complete_master_bio(mdev, &m);
4322 return TRUE;
4323}
4324
4325static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4326{
4327 struct p_block_ack *p = (struct p_block_ack *)h;
4328 sector_t sector = be64_to_cpu(p->sector);
4329 int blksize = be32_to_cpu(p->blksize);
4330 enum drbd_req_event what;
4331
4332 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4333
4334 if (is_syncer_block_id(p->block_id)) {
4335 drbd_set_in_sync(mdev, sector, blksize);
4336 dec_rs_pending(mdev);
4337 return TRUE;
4338 }
4339 switch (be16_to_cpu(h->command)) {
4340 case P_RS_WRITE_ACK:
4341 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4342 what = write_acked_by_peer_and_sis;
4343 break;
4344 case P_WRITE_ACK:
4345 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4346 what = write_acked_by_peer;
4347 break;
4348 case P_RECV_ACK:
4349 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4350 what = recv_acked_by_peer;
4351 break;
4352 case P_DISCARD_ACK:
4353 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4354 what = conflict_discarded_by_peer;
4355 break;
4356 default:
4357 D_ASSERT(0);
4358 return FALSE;
4359 }
4360
4361 return validate_req_change_req_state(mdev, p->block_id, sector,
4362 _ack_id_to_req, __func__ , what);
4363}
4364
4365static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4366{
4367 struct p_block_ack *p = (struct p_block_ack *)h;
4368 sector_t sector = be64_to_cpu(p->sector);
4369
4370 if (__ratelimit(&drbd_ratelimit_state))
4371 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4372
4373 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4374
4375 if (is_syncer_block_id(p->block_id)) {
4376 int size = be32_to_cpu(p->blksize);
4377 dec_rs_pending(mdev);
4378 drbd_rs_failed_io(mdev, sector, size);
4379 return TRUE;
4380 }
4381 return validate_req_change_req_state(mdev, p->block_id, sector,
4382 _ack_id_to_req, __func__ , neg_acked);
4383}
4384
4385static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4386{
4387 struct p_block_ack *p = (struct p_block_ack *)h;
4388 sector_t sector = be64_to_cpu(p->sector);
4389
4390 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4391 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4392 (unsigned long long)sector, be32_to_cpu(p->blksize));
4393
4394 return validate_req_change_req_state(mdev, p->block_id, sector,
4395 _ar_id_to_req, __func__ , neg_acked);
4396}
4397
4398static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4399{
4400 sector_t sector;
4401 int size;
4402 struct p_block_ack *p = (struct p_block_ack *)h;
4403
4404 sector = be64_to_cpu(p->sector);
4405 size = be32_to_cpu(p->blksize);
b411b363
PR
4406
4407 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4408
4409 dec_rs_pending(mdev);
4410
4411 if (get_ldev_if_state(mdev, D_FAILED)) {
4412 drbd_rs_complete_io(mdev, sector);
4413 drbd_rs_failed_io(mdev, sector, size);
4414 put_ldev(mdev);
4415 }
4416
4417 return TRUE;
4418}
4419
4420static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4421{
4422 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4423
4424 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4425
4426 return TRUE;
4427}
4428
4429static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4430{
4431 struct p_block_ack *p = (struct p_block_ack *)h;
4432 struct drbd_work *w;
4433 sector_t sector;
4434 int size;
4435
4436 sector = be64_to_cpu(p->sector);
4437 size = be32_to_cpu(p->blksize);
4438
4439 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4440
4441 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4442 drbd_ov_oos_found(mdev, sector, size);
4443 else
4444 ov_oos_print(mdev);
4445
4446 drbd_rs_complete_io(mdev, sector);
4447 dec_rs_pending(mdev);
4448
4449 if (--mdev->ov_left == 0) {
4450 w = kmalloc(sizeof(*w), GFP_NOIO);
4451 if (w) {
4452 w->cb = w_ov_finished;
4453 drbd_queue_work_front(&mdev->data.work, w);
4454 } else {
4455 dev_err(DEV, "kmalloc(w) failed.");
4456 ov_oos_print(mdev);
4457 drbd_resync_finished(mdev);
4458 }
4459 }
4460 return TRUE;
4461}
4462
0ced55a3
PR
4463static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4464{
4465 struct p_delay_probe *p = (struct p_delay_probe *)h;
4466
4467 got_delay_probe(mdev, USE_META_SOCKET, p);
4468 return TRUE;
4469}
4470
b411b363
PR
4471struct asender_cmd {
4472 size_t pkt_size;
4473 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4474};
4475
4476static struct asender_cmd *get_asender_cmd(int cmd)
4477{
4478 static struct asender_cmd asender_tbl[] = {
4479 /* anything missing from this table is in
4480 * the drbd_cmd_handler (drbd_default_handler) table,
4481 * see the beginning of drbdd() */
4482 [P_PING] = { sizeof(struct p_header), got_Ping },
4483 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4484 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4485 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4486 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4487 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4488 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4489 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4490 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4491 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4492 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4493 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4494 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
0ced55a3 4495 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_delay_probe_m },
b411b363
PR
4496 [P_MAX_CMD] = { 0, NULL },
4497 };
4498 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4499 return NULL;
4500 return &asender_tbl[cmd];
4501}
4502
4503int drbd_asender(struct drbd_thread *thi)
4504{
4505 struct drbd_conf *mdev = thi->mdev;
4506 struct p_header *h = &mdev->meta.rbuf.header;
4507 struct asender_cmd *cmd = NULL;
4508
4509 int rv, len;
4510 void *buf = h;
4511 int received = 0;
4512 int expect = sizeof(struct p_header);
4513 int empty;
4514
4515 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4516
4517 current->policy = SCHED_RR; /* Make this a realtime task! */
4518 current->rt_priority = 2; /* more important than all other tasks */
4519
4520 while (get_t_state(thi) == Running) {
4521 drbd_thread_current_set_cpu(mdev);
4522 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4523 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4524 mdev->meta.socket->sk->sk_rcvtimeo =
4525 mdev->net_conf->ping_timeo*HZ/10;
4526 }
4527
4528 /* conditionally cork;
4529 * it may hurt latency if we cork without much to send */
4530 if (!mdev->net_conf->no_cork &&
4531 3 < atomic_read(&mdev->unacked_cnt))
4532 drbd_tcp_cork(mdev->meta.socket);
4533 while (1) {
4534 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4535 flush_signals(current);
4536 if (!drbd_process_done_ee(mdev)) {
4537 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4538 goto reconnect;
4539 }
4540 /* to avoid race with newly queued ACKs */
4541 set_bit(SIGNAL_ASENDER, &mdev->flags);
4542 spin_lock_irq(&mdev->req_lock);
4543 empty = list_empty(&mdev->done_ee);
4544 spin_unlock_irq(&mdev->req_lock);
4545 /* new ack may have been queued right here,
4546 * but then there is also a signal pending,
4547 * and we start over... */
4548 if (empty)
4549 break;
4550 }
4551 /* but unconditionally uncork unless disabled */
4552 if (!mdev->net_conf->no_cork)
4553 drbd_tcp_uncork(mdev->meta.socket);
4554
4555 /* short circuit, recv_msg would return EINTR anyways. */
4556 if (signal_pending(current))
4557 continue;
4558
4559 rv = drbd_recv_short(mdev, mdev->meta.socket,
4560 buf, expect-received, 0);
4561 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4562
4563 flush_signals(current);
4564
4565 /* Note:
4566 * -EINTR (on meta) we got a signal
4567 * -EAGAIN (on meta) rcvtimeo expired
4568 * -ECONNRESET other side closed the connection
4569 * -ERESTARTSYS (on data) we got a signal
4570 * rv < 0 other than above: unexpected error!
4571 * rv == expected: full header or command
4572 * rv < expected: "woken" by signal during receive
4573 * rv == 0 : "connection shut down by peer"
4574 */
4575 if (likely(rv > 0)) {
4576 received += rv;
4577 buf += rv;
4578 } else if (rv == 0) {
4579 dev_err(DEV, "meta connection shut down by peer.\n");
4580 goto reconnect;
4581 } else if (rv == -EAGAIN) {
4582 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4583 mdev->net_conf->ping_timeo*HZ/10) {
4584 dev_err(DEV, "PingAck did not arrive in time.\n");
4585 goto reconnect;
4586 }
4587 set_bit(SEND_PING, &mdev->flags);
4588 continue;
4589 } else if (rv == -EINTR) {
4590 continue;
4591 } else {
4592 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4593 goto reconnect;
4594 }
4595
4596 if (received == expect && cmd == NULL) {
4597 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4598 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4599 (long)be32_to_cpu(h->magic),
4600 h->command, h->length);
4601 goto reconnect;
4602 }
4603 cmd = get_asender_cmd(be16_to_cpu(h->command));
4604 len = be16_to_cpu(h->length);
4605 if (unlikely(cmd == NULL)) {
4606 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4607 (long)be32_to_cpu(h->magic),
4608 h->command, h->length);
4609 goto disconnect;
4610 }
4611 expect = cmd->pkt_size;
6a0afdf5 4612 ERR_IF(len != expect-sizeof(struct p_header))
b411b363 4613 goto reconnect;
b411b363
PR
4614 }
4615 if (received == expect) {
4616 D_ASSERT(cmd != NULL);
b411b363
PR
4617 if (!cmd->process(mdev, h))
4618 goto reconnect;
4619
4620 buf = h;
4621 received = 0;
4622 expect = sizeof(struct p_header);
4623 cmd = NULL;
4624 }
4625 }
4626
4627 if (0) {
4628reconnect:
4629 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4630 }
4631 if (0) {
4632disconnect:
4633 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4634 }
4635 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4636
4637 D_ASSERT(mdev->state.conn < C_CONNECTED);
4638 dev_info(DEV, "asender terminated\n");
4639
4640 return 0;
4641}