]> bbs.cooldavid.org Git - net-next-2.6.git/blame - drivers/block/drbd/drbd_receiver.c
drbd: Receiving of delay_probes
[net-next-2.6.git] / drivers / block / drbd / drbd_receiver.c
CommitLineData
b411b363
PR
1/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
b411b363
PR
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
b411b363
PR
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
b411b363
PR
49#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84{
85 struct page *page = NULL;
86
87 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) {
90 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool;
92 if (page) {
93 drbd_pp_pool = (struct page *)page_private(page);
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock);
98 }
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */
102 if (!page)
103 page = alloc_page(GFP_TRY);
104 if (page)
105 atomic_inc(&mdev->pp_in_use);
106 return page;
107}
108
109/* kick lower level device, if we have more than (arbitrary number)
110 * reference counts on it, which typically are locally submitted io
111 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
112static void maybe_kick_lo(struct drbd_conf *mdev)
113{
114 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115 drbd_kick_lo(mdev);
116}
117
118static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119{
120 struct drbd_epoch_entry *e;
121 struct list_head *le, *tle;
122
123 /* The EEs are always appended to the end of the list. Since
124 they are sent in order over the wire, they have to finish
125 in order. As soon as we see the first not finished we can
126 stop to examine the list... */
127
128 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio))
131 break;
132 list_move(le, to_be_freed);
133 }
134}
135
136static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137{
138 LIST_HEAD(reclaimed);
139 struct drbd_epoch_entry *e, *t;
140
141 maybe_kick_lo(mdev);
142 spin_lock_irq(&mdev->req_lock);
143 reclaim_net_ee(mdev, &reclaimed);
144 spin_unlock_irq(&mdev->req_lock);
145
146 list_for_each_entry_safe(e, t, &reclaimed, w.list)
147 drbd_free_ee(mdev, e);
148}
149
150/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled)
154 *
155 * Tries to allocate a page, first from our own page pool, then from the
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160{
161 struct page *page = NULL;
162 DEFINE_WAIT(wait);
163
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165 page = drbd_pp_first_page_or_try_alloc(mdev);
166 if (page)
167 return page;
168 }
169
170 for (;;) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172
173 drbd_kick_lo_and_reclaim_net(mdev);
174
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev);
177 if (page)
178 break;
179 }
180
181 if (!retry)
182 break;
183
184 if (signal_pending(current)) {
185 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186 break;
187 }
188
189 schedule();
190 }
191 finish_wait(&drbd_pp_wait, &wait);
192
193 return page;
194}
195
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i;
227
228 spin_lock(&drbd_pp_lock);
229 __bio_for_each_segment(bvec, bio, i, 0) {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232 p_to_be_freed = bvec->bv_page;
233 } else {
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235 drbd_pp_pool = bvec->bv_page;
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 }
248
249 wake_up(&drbd_pp_wait);
250}
251
252/*
253You need to hold the req_lock:
254 _drbd_wait_ee_list_empty()
255
256You must not have the req_lock:
257 drbd_free_ee()
258 drbd_alloc_ee()
259 drbd_init_ee()
260 drbd_release_ee()
261 drbd_ee_fix_bhs()
262 drbd_process_done_ee()
263 drbd_clear_done_ee()
264 drbd_wait_ee_list_empty()
265*/
266
267struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268 u64 id,
269 sector_t sector,
270 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local)
272{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e;
275 struct page *page;
276 struct bio *bio;
277 unsigned int ds;
278
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL;
281
282 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283 if (!e) {
284 if (!(gfp_mask & __GFP_NOWARN))
285 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286 return NULL;
287 }
288
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290 if (!bio) {
291 if (!(gfp_mask & __GFP_NOWARN))
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL;
349 e->flags = 0;
350
b411b363
PR
351 return e;
352
353 fail2:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool);
358
359 return NULL;
360}
361
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{
364 struct bio *bio = e->private_bio;
b411b363
PR
365 drbd_pp_free_bio_pages(mdev, bio);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool);
369}
370
371int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372{
373 LIST_HEAD(work_list);
374 struct drbd_epoch_entry *e, *t;
375 int count = 0;
376
377 spin_lock_irq(&mdev->req_lock);
378 list_splice_init(list, &work_list);
379 spin_unlock_irq(&mdev->req_lock);
380
381 list_for_each_entry_safe(e, t, &work_list, w.list) {
382 drbd_free_ee(mdev, e);
383 count++;
384 }
385 return count;
386}
387
388
389/*
390 * This function is called from _asender only_
391 * but see also comments in _req_mod(,barrier_acked)
392 * and receive_Barrier.
393 *
394 * Move entries from net_ee to done_ee, if ready.
395 * Grab done_ee, call all callbacks, free the entries.
396 * The callbacks typically send out ACKs.
397 */
398static int drbd_process_done_ee(struct drbd_conf *mdev)
399{
400 LIST_HEAD(work_list);
401 LIST_HEAD(reclaimed);
402 struct drbd_epoch_entry *e, *t;
403 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404
405 spin_lock_irq(&mdev->req_lock);
406 reclaim_net_ee(mdev, &reclaimed);
407 list_splice_init(&mdev->done_ee, &work_list);
408 spin_unlock_irq(&mdev->req_lock);
409
410 list_for_each_entry_safe(e, t, &reclaimed, w.list)
411 drbd_free_ee(mdev, e);
412
413 /* possible callbacks here:
414 * e_end_block, and e_end_resync_block, e_send_discard_ack.
415 * all ignore the last argument.
416 */
417 list_for_each_entry_safe(e, t, &work_list, w.list) {
b411b363
PR
418 /* list_del not necessary, next/prev members not touched */
419 ok = e->w.cb(mdev, &e->w, !ok) && ok;
420 drbd_free_ee(mdev, e);
421 }
422 wake_up(&mdev->ee_wait);
423
424 return ok;
425}
426
427void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428{
429 DEFINE_WAIT(wait);
430
431 /* avoids spin_lock/unlock
432 * and calling prepare_to_wait in the fast path */
433 while (!list_empty(head)) {
434 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435 spin_unlock_irq(&mdev->req_lock);
436 drbd_kick_lo(mdev);
437 schedule();
438 finish_wait(&mdev->ee_wait, &wait);
439 spin_lock_irq(&mdev->req_lock);
440 }
441}
442
443void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444{
445 spin_lock_irq(&mdev->req_lock);
446 _drbd_wait_ee_list_empty(mdev, head);
447 spin_unlock_irq(&mdev->req_lock);
448}
449
450/* see also kernel_accept; which is only present since 2.6.18.
451 * also we want to log which part of it failed, exactly */
452static int drbd_accept(struct drbd_conf *mdev, const char **what,
453 struct socket *sock, struct socket **newsock)
454{
455 struct sock *sk = sock->sk;
456 int err = 0;
457
458 *what = "listen";
459 err = sock->ops->listen(sock, 5);
460 if (err < 0)
461 goto out;
462
463 *what = "sock_create_lite";
464 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465 newsock);
466 if (err < 0)
467 goto out;
468
469 *what = "accept";
470 err = sock->ops->accept(sock, *newsock, 0);
471 if (err < 0) {
472 sock_release(*newsock);
473 *newsock = NULL;
474 goto out;
475 }
476 (*newsock)->ops = sock->ops;
477
478out:
479 return err;
480}
481
482static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483 void *buf, size_t size, int flags)
484{
485 mm_segment_t oldfs;
486 struct kvec iov = {
487 .iov_base = buf,
488 .iov_len = size,
489 };
490 struct msghdr msg = {
491 .msg_iovlen = 1,
492 .msg_iov = (struct iovec *)&iov,
493 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494 };
495 int rv;
496
497 oldfs = get_fs();
498 set_fs(KERNEL_DS);
499 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500 set_fs(oldfs);
501
502 return rv;
503}
504
505static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506{
507 mm_segment_t oldfs;
508 struct kvec iov = {
509 .iov_base = buf,
510 .iov_len = size,
511 };
512 struct msghdr msg = {
513 .msg_iovlen = 1,
514 .msg_iov = (struct iovec *)&iov,
515 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516 };
517 int rv;
518
519 oldfs = get_fs();
520 set_fs(KERNEL_DS);
521
522 for (;;) {
523 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524 if (rv == size)
525 break;
526
527 /* Note:
528 * ECONNRESET other side closed the connection
529 * ERESTARTSYS (on sock) we got a signal
530 */
531
532 if (rv < 0) {
533 if (rv == -ECONNRESET)
534 dev_info(DEV, "sock was reset by peer\n");
535 else if (rv != -ERESTARTSYS)
536 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537 break;
538 } else if (rv == 0) {
539 dev_info(DEV, "sock was shut down by peer\n");
540 break;
541 } else {
542 /* signal came in, or peer/link went down,
543 * after we read a partial message
544 */
545 /* D_ASSERT(signal_pending(current)); */
546 break;
547 }
548 };
549
550 set_fs(oldfs);
551
552 if (rv != size)
553 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554
555 return rv;
556}
557
558static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559{
560 const char *what;
561 struct socket *sock;
562 struct sockaddr_in6 src_in6;
563 int err;
564 int disconnect_on_error = 1;
565
566 if (!get_net_conf(mdev))
567 return NULL;
568
569 what = "sock_create_kern";
570 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571 SOCK_STREAM, IPPROTO_TCP, &sock);
572 if (err < 0) {
573 sock = NULL;
574 goto out;
575 }
576
577 sock->sk->sk_rcvtimeo =
578 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
579
580 /* explicitly bind to the configured IP as source IP
581 * for the outgoing connections.
582 * This is needed for multihomed hosts and to be
583 * able to use lo: interfaces for drbd.
584 * Make sure to use 0 as port number, so linux selects
585 * a free one dynamically.
586 */
587 memcpy(&src_in6, mdev->net_conf->my_addr,
588 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590 src_in6.sin6_port = 0;
591 else
592 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593
594 what = "bind before connect";
595 err = sock->ops->bind(sock,
596 (struct sockaddr *) &src_in6,
597 mdev->net_conf->my_addr_len);
598 if (err < 0)
599 goto out;
600
601 /* connect may fail, peer not yet available.
602 * stay C_WF_CONNECTION, don't go Disconnecting! */
603 disconnect_on_error = 0;
604 what = "connect";
605 err = sock->ops->connect(sock,
606 (struct sockaddr *)mdev->net_conf->peer_addr,
607 mdev->net_conf->peer_addr_len, 0);
608
609out:
610 if (err < 0) {
611 if (sock) {
612 sock_release(sock);
613 sock = NULL;
614 }
615 switch (-err) {
616 /* timeout, busy, signal pending */
617 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618 case EINTR: case ERESTARTSYS:
619 /* peer not (yet) available, network problem */
620 case ECONNREFUSED: case ENETUNREACH:
621 case EHOSTDOWN: case EHOSTUNREACH:
622 disconnect_on_error = 0;
623 break;
624 default:
625 dev_err(DEV, "%s failed, err = %d\n", what, err);
626 }
627 if (disconnect_on_error)
628 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629 }
630 put_net_conf(mdev);
631 return sock;
632}
633
634static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635{
636 int timeo, err;
637 struct socket *s_estab = NULL, *s_listen;
638 const char *what;
639
640 if (!get_net_conf(mdev))
641 return NULL;
642
643 what = "sock_create_kern";
644 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645 SOCK_STREAM, IPPROTO_TCP, &s_listen);
646 if (err) {
647 s_listen = NULL;
648 goto out;
649 }
650
651 timeo = mdev->net_conf->try_connect_int * HZ;
652 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653
654 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
655 s_listen->sk->sk_rcvtimeo = timeo;
656 s_listen->sk->sk_sndtimeo = timeo;
657
658 what = "bind before listen";
659 err = s_listen->ops->bind(s_listen,
660 (struct sockaddr *) mdev->net_conf->my_addr,
661 mdev->net_conf->my_addr_len);
662 if (err < 0)
663 goto out;
664
665 err = drbd_accept(mdev, &what, s_listen, &s_estab);
666
667out:
668 if (s_listen)
669 sock_release(s_listen);
670 if (err < 0) {
671 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672 dev_err(DEV, "%s failed, err = %d\n", what, err);
673 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674 }
675 }
676 put_net_conf(mdev);
677
678 return s_estab;
679}
680
681static int drbd_send_fp(struct drbd_conf *mdev,
682 struct socket *sock, enum drbd_packets cmd)
683{
684 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685
686 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687}
688
689static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690{
691 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692 int rr;
693
694 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695
696 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697 return be16_to_cpu(h->command);
698
699 return 0xffff;
700}
701
702/**
703 * drbd_socket_okay() - Free the socket if its connection is not okay
704 * @mdev: DRBD device.
705 * @sock: pointer to the pointer to the socket.
706 */
707static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708{
709 int rr;
710 char tb[4];
711
712 if (!*sock)
713 return FALSE;
714
715 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716
717 if (rr > 0 || rr == -EAGAIN) {
718 return TRUE;
719 } else {
720 sock_release(*sock);
721 *sock = NULL;
722 return FALSE;
723 }
724}
725
726/*
727 * return values:
728 * 1 yes, we have a valid connection
729 * 0 oops, did not work out, please try again
730 * -1 peer talks different language,
731 * no point in trying again, please go standalone.
732 * -2 We do not have a network config...
733 */
734static int drbd_connect(struct drbd_conf *mdev)
735{
736 struct socket *s, *sock, *msock;
737 int try, h, ok;
738
739 D_ASSERT(!mdev->data.socket);
740
741 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743
744 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745 return -2;
746
747 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748
749 sock = NULL;
750 msock = NULL;
751
752 do {
753 for (try = 0;;) {
754 /* 3 tries, this should take less than a second! */
755 s = drbd_try_connect(mdev);
756 if (s || ++try >= 3)
757 break;
758 /* give the other side time to call bind() & listen() */
759 __set_current_state(TASK_INTERRUPTIBLE);
760 schedule_timeout(HZ / 10);
761 }
762
763 if (s) {
764 if (!sock) {
765 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766 sock = s;
767 s = NULL;
768 } else if (!msock) {
769 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770 msock = s;
771 s = NULL;
772 } else {
773 dev_err(DEV, "Logic error in drbd_connect()\n");
774 goto out_release_sockets;
775 }
776 }
777
778 if (sock && msock) {
779 __set_current_state(TASK_INTERRUPTIBLE);
780 schedule_timeout(HZ / 10);
781 ok = drbd_socket_okay(mdev, &sock);
782 ok = drbd_socket_okay(mdev, &msock) && ok;
783 if (ok)
784 break;
785 }
786
787retry:
788 s = drbd_wait_for_connect(mdev);
789 if (s) {
790 try = drbd_recv_fp(mdev, s);
791 drbd_socket_okay(mdev, &sock);
792 drbd_socket_okay(mdev, &msock);
793 switch (try) {
794 case P_HAND_SHAKE_S:
795 if (sock) {
796 dev_warn(DEV, "initial packet S crossed\n");
797 sock_release(sock);
798 }
799 sock = s;
800 break;
801 case P_HAND_SHAKE_M:
802 if (msock) {
803 dev_warn(DEV, "initial packet M crossed\n");
804 sock_release(msock);
805 }
806 msock = s;
807 set_bit(DISCARD_CONCURRENT, &mdev->flags);
808 break;
809 default:
810 dev_warn(DEV, "Error receiving initial packet\n");
811 sock_release(s);
812 if (random32() & 1)
813 goto retry;
814 }
815 }
816
817 if (mdev->state.conn <= C_DISCONNECTING)
818 goto out_release_sockets;
819 if (signal_pending(current)) {
820 flush_signals(current);
821 smp_rmb();
822 if (get_t_state(&mdev->receiver) == Exiting)
823 goto out_release_sockets;
824 }
825
826 if (sock && msock) {
827 ok = drbd_socket_okay(mdev, &sock);
828 ok = drbd_socket_okay(mdev, &msock) && ok;
829 if (ok)
830 break;
831 }
832 } while (1);
833
834 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836
837 sock->sk->sk_allocation = GFP_NOIO;
838 msock->sk->sk_allocation = GFP_NOIO;
839
840 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842
843 if (mdev->net_conf->sndbuf_size) {
844 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846 }
847
848 if (mdev->net_conf->rcvbuf_size) {
849 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851 }
852
853 /* NOT YET ...
854 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856 * first set it to the P_HAND_SHAKE timeout,
857 * which we set to 4x the configured ping_timeout. */
858 sock->sk->sk_sndtimeo =
859 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864 /* we don't want delays.
865 * we use TCP_CORK where apropriate, though */
866 drbd_tcp_nodelay(sock);
867 drbd_tcp_nodelay(msock);
868
869 mdev->data.socket = sock;
870 mdev->meta.socket = msock;
871 mdev->last_received = jiffies;
872
873 D_ASSERT(mdev->asender.task == NULL);
874
875 h = drbd_do_handshake(mdev);
876 if (h <= 0)
877 return h;
878
879 if (mdev->cram_hmac_tfm) {
880 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
b10d96cb
JT
881 switch (drbd_do_auth(mdev)) {
882 case -1:
b411b363
PR
883 dev_err(DEV, "Authentication of peer failed\n");
884 return -1;
b10d96cb
JT
885 case 0:
886 dev_err(DEV, "Authentication of peer failed, trying again.\n");
887 return 0;
b411b363
PR
888 }
889 }
890
891 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892 return 0;
893
894 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897 atomic_set(&mdev->packet_seq, 0);
898 mdev->peer_seq = 0;
899
900 drbd_thread_start(&mdev->asender);
901
7e2455c1
PR
902 if (!drbd_send_protocol(mdev))
903 return -1;
b411b363 904 drbd_send_sync_param(mdev, &mdev->sync_conf);
e89b591c 905 drbd_send_sizes(mdev, 0, 0);
b411b363
PR
906 drbd_send_uuids(mdev);
907 drbd_send_state(mdev);
908 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909 clear_bit(RESIZE_PENDING, &mdev->flags);
910
911 return 1;
912
913out_release_sockets:
914 if (sock)
915 sock_release(sock);
916 if (msock)
917 sock_release(msock);
918 return -1;
919}
920
921static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
922{
923 int r;
924
925 r = drbd_recv(mdev, h, sizeof(*h));
926
927 if (unlikely(r != sizeof(*h))) {
928 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
929 return FALSE;
930 };
931 h->command = be16_to_cpu(h->command);
932 h->length = be16_to_cpu(h->length);
933 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
934 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
935 (long)be32_to_cpu(h->magic),
936 h->command, h->length);
937 return FALSE;
938 }
939 mdev->last_received = jiffies;
940
941 return TRUE;
942}
943
944static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
945{
946 int rv;
947
948 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
fbd9b09a
DM
949 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
950 NULL, BLKDEV_IFL_WAIT);
b411b363
PR
951 if (rv) {
952 dev_err(DEV, "local disk flush failed with status %d\n", rv);
953 /* would rather check on EOPNOTSUPP, but that is not reliable.
954 * don't try again for ANY return value != 0
955 * if (rv == -EOPNOTSUPP) */
956 drbd_bump_write_ordering(mdev, WO_drain_io);
957 }
958 put_ldev(mdev);
959 }
960
961 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
962}
963
964static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965{
966 struct flush_work *fw = (struct flush_work *)w;
967 struct drbd_epoch *epoch = fw->epoch;
968
969 kfree(w);
970
971 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
972 drbd_flush_after_epoch(mdev, epoch);
973
974 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
975 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
976
977 return 1;
978}
979
980/**
981 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
982 * @mdev: DRBD device.
983 * @epoch: Epoch object.
984 * @ev: Epoch event.
985 */
986static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
987 struct drbd_epoch *epoch,
988 enum epoch_event ev)
989{
990 int finish, epoch_size;
991 struct drbd_epoch *next_epoch;
992 int schedule_flush = 0;
993 enum finish_epoch rv = FE_STILL_LIVE;
994
995 spin_lock(&mdev->epoch_lock);
996 do {
997 next_epoch = NULL;
998 finish = 0;
999
1000 epoch_size = atomic_read(&epoch->epoch_size);
1001
1002 switch (ev & ~EV_CLEANUP) {
1003 case EV_PUT:
1004 atomic_dec(&epoch->active);
1005 break;
1006 case EV_GOT_BARRIER_NR:
1007 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1008
1009 /* Special case: If we just switched from WO_bio_barrier to
1010 WO_bdev_flush we should not finish the current epoch */
1011 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1012 mdev->write_ordering != WO_bio_barrier &&
1013 epoch == mdev->current_epoch)
1014 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1015 break;
1016 case EV_BARRIER_DONE:
1017 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1018 break;
1019 case EV_BECAME_LAST:
1020 /* nothing to do*/
1021 break;
1022 }
1023
b411b363
PR
1024 if (epoch_size != 0 &&
1025 atomic_read(&epoch->active) == 0 &&
1026 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1027 epoch->list.prev == &mdev->current_epoch->list &&
1028 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1029 /* Nearly all conditions are met to finish that epoch... */
1030 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1031 mdev->write_ordering == WO_none ||
1032 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1033 ev & EV_CLEANUP) {
1034 finish = 1;
1035 set_bit(DE_IS_FINISHING, &epoch->flags);
1036 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1037 mdev->write_ordering == WO_bio_barrier) {
1038 atomic_inc(&epoch->active);
1039 schedule_flush = 1;
1040 }
1041 }
1042 if (finish) {
1043 if (!(ev & EV_CLEANUP)) {
1044 spin_unlock(&mdev->epoch_lock);
1045 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1046 spin_lock(&mdev->epoch_lock);
1047 }
1048 dec_unacked(mdev);
1049
1050 if (mdev->current_epoch != epoch) {
1051 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1052 list_del(&epoch->list);
1053 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1054 mdev->epochs--;
b411b363
PR
1055 kfree(epoch);
1056
1057 if (rv == FE_STILL_LIVE)
1058 rv = FE_DESTROYED;
1059 } else {
1060 epoch->flags = 0;
1061 atomic_set(&epoch->epoch_size, 0);
1062 /* atomic_set(&epoch->active, 0); is alrady zero */
1063 if (rv == FE_STILL_LIVE)
1064 rv = FE_RECYCLED;
1065 }
1066 }
1067
1068 if (!next_epoch)
1069 break;
1070
1071 epoch = next_epoch;
1072 } while (1);
1073
1074 spin_unlock(&mdev->epoch_lock);
1075
1076 if (schedule_flush) {
1077 struct flush_work *fw;
1078 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1079 if (fw) {
b411b363
PR
1080 fw->w.cb = w_flush;
1081 fw->epoch = epoch;
1082 drbd_queue_work(&mdev->data.work, &fw->w);
1083 } else {
1084 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1085 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1086 /* That is not a recursion, only one level */
1087 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1088 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1089 }
1090 }
1091
1092 return rv;
1093}
1094
1095/**
1096 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1097 * @mdev: DRBD device.
1098 * @wo: Write ordering method to try.
1099 */
1100void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1101{
1102 enum write_ordering_e pwo;
1103 static char *write_ordering_str[] = {
1104 [WO_none] = "none",
1105 [WO_drain_io] = "drain",
1106 [WO_bdev_flush] = "flush",
1107 [WO_bio_barrier] = "barrier",
1108 };
1109
1110 pwo = mdev->write_ordering;
1111 wo = min(pwo, wo);
1112 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1113 wo = WO_bdev_flush;
1114 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1115 wo = WO_drain_io;
1116 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1117 wo = WO_none;
1118 mdev->write_ordering = wo;
1119 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1120 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1121}
1122
1123/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device.
1126 * @w: work object.
1127 * @cancel: The connection will be closed anyways (unused in this callback)
1128 */
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch().
1137 That is necessary if we already have a long chain of Epochs, before
1138 we realize that BIO_RW_BARRIER is actually not supported */
1139
1140 /* As long as the -ENOTSUPP on the barrier is reported immediately
1141 that will never trigger. If it is reported late, we will just
1142 print that warning and continue correctly for all future requests
1143 with WO_bdev_flush */
1144 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166
1167 e->w.cb = e_end_block;
1168
1169 /* This is no longer a barrier request. */
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1171
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1173
1174 return 1;
1175}
1176
1177static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1178{
1179 int rv, issue_flush;
1180 struct p_barrier *p = (struct p_barrier *)h;
1181 struct drbd_epoch *epoch;
1182
1183 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1184
1185 rv = drbd_recv(mdev, h->payload, h->length);
1186 ERR_IF(rv != h->length) return FALSE;
1187
1188 inc_unacked(mdev);
1189
1190 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1191 drbd_kick_lo(mdev);
1192
1193 mdev->current_epoch->barrier_nr = p->barrier;
1194 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1195
1196 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1197 * the activity log, which means it would not be resynced in case the
1198 * R_PRIMARY crashes now.
1199 * Therefore we must send the barrier_ack after the barrier request was
1200 * completed. */
1201 switch (mdev->write_ordering) {
1202 case WO_bio_barrier:
1203 case WO_none:
1204 if (rv == FE_RECYCLED)
1205 return TRUE;
1206 break;
1207
1208 case WO_bdev_flush:
1209 case WO_drain_io:
367a8d73
PR
1210 if (rv == FE_STILL_LIVE) {
1211 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1212 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1213 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1214 }
b411b363
PR
1215 if (rv == FE_RECYCLED)
1216 return TRUE;
1217
1218 /* The asender will send all the ACKs and barrier ACKs out, since
1219 all EEs moved from the active_ee to the done_ee. We need to
1220 provide a new epoch object for the EEs that come in soon */
1221 break;
1222 }
1223
1224 /* receiver context, in the writeout path of the other node.
1225 * avoid potential distributed deadlock */
1226 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1227 if (!epoch) {
1228 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
d3db7b48 1229 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
b411b363
PR
1230 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1231 if (issue_flush) {
1232 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1233 if (rv == FE_RECYCLED)
1234 return TRUE;
1235 }
1236
1237 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1238
1239 return TRUE;
1240 }
1241
1242 epoch->flags = 0;
1243 atomic_set(&epoch->epoch_size, 0);
1244 atomic_set(&epoch->active, 0);
1245
1246 spin_lock(&mdev->epoch_lock);
1247 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1248 list_add(&epoch->list, &mdev->current_epoch->list);
1249 mdev->current_epoch = epoch;
1250 mdev->epochs++;
b411b363
PR
1251 } else {
1252 /* The current_epoch got recycled while we allocated this one... */
1253 kfree(epoch);
1254 }
1255 spin_unlock(&mdev->epoch_lock);
1256
1257 return TRUE;
1258}
1259
1260/* used from receive_RSDataReply (recv_resync_read)
1261 * and from receive_Data */
1262static struct drbd_epoch_entry *
1263read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1264{
6666032a 1265 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
b411b363
PR
1266 struct drbd_epoch_entry *e;
1267 struct bio_vec *bvec;
1268 struct page *page;
1269 struct bio *bio;
1270 int dgs, ds, i, rr;
1271 void *dig_in = mdev->int_dig_in;
1272 void *dig_vv = mdev->int_dig_vv;
6b4388ac 1273 unsigned long *data;
b411b363
PR
1274
1275 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1276 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1277
1278 if (dgs) {
1279 rr = drbd_recv(mdev, dig_in, dgs);
1280 if (rr != dgs) {
1281 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1282 rr, dgs);
1283 return NULL;
1284 }
1285 }
1286
1287 data_size -= dgs;
1288
1289 ERR_IF(data_size & 0x1ff) return NULL;
1290 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1291
6666032a
LE
1292 /* even though we trust out peer,
1293 * we sometimes have to double check. */
1294 if (sector + (data_size>>9) > capacity) {
1295 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1296 (unsigned long long)capacity,
1297 (unsigned long long)sector, data_size);
1298 return NULL;
1299 }
1300
b411b363
PR
1301 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1302 * "criss-cross" setup, that might cause write-out on some other DRBD,
1303 * which in turn might block on the other node at this very place. */
1304 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1305 if (!e)
1306 return NULL;
1307 bio = e->private_bio;
1308 ds = data_size;
1309 bio_for_each_segment(bvec, bio, i) {
1310 page = bvec->bv_page;
6b4388ac
PR
1311 data = kmap(page);
1312 rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
1313 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1314 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1315 data[0] = data[0] ^ (unsigned long)-1;
1316 }
b411b363
PR
1317 kunmap(page);
1318 if (rr != min_t(int, ds, PAGE_SIZE)) {
1319 drbd_free_ee(mdev, e);
1320 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1321 rr, min_t(int, ds, PAGE_SIZE));
1322 return NULL;
1323 }
1324 ds -= rr;
1325 }
1326
1327 if (dgs) {
1328 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1329 if (memcmp(dig_in, dig_vv, dgs)) {
1330 dev_err(DEV, "Digest integrity check FAILED.\n");
1331 drbd_bcast_ee(mdev, "digest failed",
1332 dgs, dig_in, dig_vv, e);
1333 drbd_free_ee(mdev, e);
1334 return NULL;
1335 }
1336 }
1337 mdev->recv_cnt += data_size>>9;
1338 return e;
1339}
1340
1341/* drbd_drain_block() just takes a data block
1342 * out of the socket input buffer, and discards it.
1343 */
1344static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1345{
1346 struct page *page;
1347 int rr, rv = 1;
1348 void *data;
1349
c3470cde
LE
1350 if (!data_size)
1351 return TRUE;
1352
b411b363
PR
1353 page = drbd_pp_alloc(mdev, 1);
1354
1355 data = kmap(page);
1356 while (data_size) {
1357 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1358 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1359 rv = 0;
1360 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1361 rr, min_t(int, data_size, PAGE_SIZE));
1362 break;
1363 }
1364 data_size -= rr;
1365 }
1366 kunmap(page);
1367 drbd_pp_free(mdev, page);
1368 return rv;
1369}
1370
1371static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1372 sector_t sector, int data_size)
1373{
1374 struct bio_vec *bvec;
1375 struct bio *bio;
1376 int dgs, rr, i, expect;
1377 void *dig_in = mdev->int_dig_in;
1378 void *dig_vv = mdev->int_dig_vv;
1379
1380 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1381 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1382
1383 if (dgs) {
1384 rr = drbd_recv(mdev, dig_in, dgs);
1385 if (rr != dgs) {
1386 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1387 rr, dgs);
1388 return 0;
1389 }
1390 }
1391
1392 data_size -= dgs;
1393
1394 /* optimistically update recv_cnt. if receiving fails below,
1395 * we disconnect anyways, and counters will be reset. */
1396 mdev->recv_cnt += data_size>>9;
1397
1398 bio = req->master_bio;
1399 D_ASSERT(sector == bio->bi_sector);
1400
1401 bio_for_each_segment(bvec, bio, i) {
1402 expect = min_t(int, data_size, bvec->bv_len);
1403 rr = drbd_recv(mdev,
1404 kmap(bvec->bv_page)+bvec->bv_offset,
1405 expect);
1406 kunmap(bvec->bv_page);
1407 if (rr != expect) {
1408 dev_warn(DEV, "short read receiving data reply: "
1409 "read %d expected %d\n",
1410 rr, expect);
1411 return 0;
1412 }
1413 data_size -= rr;
1414 }
1415
1416 if (dgs) {
1417 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1418 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0;
1421 }
1422 }
1423
1424 D_ASSERT(data_size == 0);
1425 return 1;
1426}
1427
1428/* e_end_resync_block() is called via
1429 * drbd_process_done_ee() by asender only */
1430static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1431{
1432 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1433 sector_t sector = e->sector;
1434 int ok;
1435
1436 D_ASSERT(hlist_unhashed(&e->colision));
1437
1438 if (likely(drbd_bio_uptodate(e->private_bio))) {
1439 drbd_set_in_sync(mdev, sector, e->size);
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else {
1442 /* Record failure to sync */
1443 drbd_rs_failed_io(mdev, sector, e->size);
1444
1445 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1446 }
1447 dec_unacked(mdev);
1448
1449 return ok;
1450}
1451
1452static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1453{
1454 struct drbd_epoch_entry *e;
1455
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1457 if (!e) {
1458 put_ldev(mdev);
1459 return FALSE;
1460 }
1461
1462 dec_rs_pending(mdev);
1463
1464 e->private_bio->bi_end_io = drbd_endio_write_sec;
1465 e->private_bio->bi_rw = WRITE;
1466 e->w.cb = e_end_resync_block;
1467
1468 inc_unacked(mdev);
1469 /* corresponding dec_unacked() in e_end_resync_block()
1470 * respective _drbd_clear_done_ee */
1471
1472 spin_lock_irq(&mdev->req_lock);
1473 list_add(&e->w.list, &mdev->sync_ee);
1474 spin_unlock_irq(&mdev->req_lock);
1475
b411b363
PR
1476 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1477 /* accounting done in endio */
1478
1479 maybe_kick_lo(mdev);
1480 return TRUE;
1481}
1482
1483static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1484{
1485 struct drbd_request *req;
1486 sector_t sector;
1487 unsigned int header_size, data_size;
1488 int ok;
1489 struct p_data *p = (struct p_data *)h;
1490
1491 header_size = sizeof(*p) - sizeof(*h);
1492 data_size = h->length - header_size;
1493
1494 ERR_IF(data_size == 0) return FALSE;
1495
1496 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1497 return FALSE;
1498
1499 sector = be64_to_cpu(p->sector);
1500
1501 spin_lock_irq(&mdev->req_lock);
1502 req = _ar_id_to_req(mdev, p->block_id, sector);
1503 spin_unlock_irq(&mdev->req_lock);
1504 if (unlikely(!req)) {
1505 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1506 return FALSE;
1507 }
1508
1509 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1510 * special casing it there for the various failure cases.
1511 * still no race with drbd_fail_pending_reads */
1512 ok = recv_dless_read(mdev, req, sector, data_size);
1513
1514 if (ok)
1515 req_mod(req, data_received);
1516 /* else: nothing. handled from drbd_disconnect...
1517 * I don't think we may complete this just yet
1518 * in case we are "on-disconnect: freeze" */
1519
1520 return ok;
1521}
1522
1523static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1524{
1525 sector_t sector;
1526 unsigned int header_size, data_size;
1527 int ok;
1528 struct p_data *p = (struct p_data *)h;
1529
1530 header_size = sizeof(*p) - sizeof(*h);
1531 data_size = h->length - header_size;
1532
1533 ERR_IF(data_size == 0) return FALSE;
1534
1535 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1536 return FALSE;
1537
1538 sector = be64_to_cpu(p->sector);
1539 D_ASSERT(p->block_id == ID_SYNCER);
1540
1541 if (get_ldev(mdev)) {
1542 /* data is submitted to disk within recv_resync_read.
1543 * corresponding put_ldev done below on error,
1544 * or in drbd_endio_write_sec. */
1545 ok = recv_resync_read(mdev, sector, data_size);
1546 } else {
1547 if (__ratelimit(&drbd_ratelimit_state))
1548 dev_err(DEV, "Can not write resync data to local disk.\n");
1549
1550 ok = drbd_drain_block(mdev, data_size);
1551
1552 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1553 }
1554
1555 return ok;
1556}
1557
1558/* e_end_block() is called via drbd_process_done_ee().
1559 * this means this function only runs in the asender thread
1560 */
1561static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1562{
1563 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1564 sector_t sector = e->sector;
1565 struct drbd_epoch *epoch;
1566 int ok = 1, pcmd;
1567
1568 if (e->flags & EE_IS_BARRIER) {
1569 epoch = previous_epoch(mdev, e->epoch);
1570 if (epoch)
1571 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1572 }
1573
1574 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1575 if (likely(drbd_bio_uptodate(e->private_bio))) {
1576 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1577 mdev->state.conn <= C_PAUSED_SYNC_T &&
1578 e->flags & EE_MAY_SET_IN_SYNC) ?
1579 P_RS_WRITE_ACK : P_WRITE_ACK;
1580 ok &= drbd_send_ack(mdev, pcmd, e);
1581 if (pcmd == P_RS_WRITE_ACK)
1582 drbd_set_in_sync(mdev, sector, e->size);
1583 } else {
1584 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1585 /* we expect it to be marked out of sync anyways...
1586 * maybe assert this? */
1587 }
1588 dec_unacked(mdev);
1589 }
1590 /* we delete from the conflict detection hash _after_ we sent out the
1591 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1592 if (mdev->net_conf->two_primaries) {
1593 spin_lock_irq(&mdev->req_lock);
1594 D_ASSERT(!hlist_unhashed(&e->colision));
1595 hlist_del_init(&e->colision);
1596 spin_unlock_irq(&mdev->req_lock);
1597 } else {
1598 D_ASSERT(hlist_unhashed(&e->colision));
1599 }
1600
1601 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1602
1603 return ok;
1604}
1605
1606static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1607{
1608 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1609 int ok = 1;
1610
1611 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1612 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1613
1614 spin_lock_irq(&mdev->req_lock);
1615 D_ASSERT(!hlist_unhashed(&e->colision));
1616 hlist_del_init(&e->colision);
1617 spin_unlock_irq(&mdev->req_lock);
1618
1619 dec_unacked(mdev);
1620
1621 return ok;
1622}
1623
1624/* Called from receive_Data.
1625 * Synchronize packets on sock with packets on msock.
1626 *
1627 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1628 * packet traveling on msock, they are still processed in the order they have
1629 * been sent.
1630 *
1631 * Note: we don't care for Ack packets overtaking P_DATA packets.
1632 *
1633 * In case packet_seq is larger than mdev->peer_seq number, there are
1634 * outstanding packets on the msock. We wait for them to arrive.
1635 * In case we are the logically next packet, we update mdev->peer_seq
1636 * ourselves. Correctly handles 32bit wrap around.
1637 *
1638 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1639 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1640 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1641 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1642 *
1643 * returns 0 if we may process the packet,
1644 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1645static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1646{
1647 DEFINE_WAIT(wait);
1648 unsigned int p_seq;
1649 long timeout;
1650 int ret = 0;
1651 spin_lock(&mdev->peer_seq_lock);
1652 for (;;) {
1653 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1654 if (seq_le(packet_seq, mdev->peer_seq+1))
1655 break;
1656 if (signal_pending(current)) {
1657 ret = -ERESTARTSYS;
1658 break;
1659 }
1660 p_seq = mdev->peer_seq;
1661 spin_unlock(&mdev->peer_seq_lock);
1662 timeout = schedule_timeout(30*HZ);
1663 spin_lock(&mdev->peer_seq_lock);
1664 if (timeout == 0 && p_seq == mdev->peer_seq) {
1665 ret = -ETIMEDOUT;
1666 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1667 break;
1668 }
1669 }
1670 finish_wait(&mdev->seq_wait, &wait);
1671 if (mdev->peer_seq+1 == packet_seq)
1672 mdev->peer_seq++;
1673 spin_unlock(&mdev->peer_seq_lock);
1674 return ret;
1675}
1676
1677/* mirrored write */
1678static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1679{
1680 sector_t sector;
1681 struct drbd_epoch_entry *e;
1682 struct p_data *p = (struct p_data *)h;
1683 int header_size, data_size;
1684 int rw = WRITE;
1685 u32 dp_flags;
1686
1687 header_size = sizeof(*p) - sizeof(*h);
1688 data_size = h->length - header_size;
1689
1690 ERR_IF(data_size == 0) return FALSE;
1691
1692 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1693 return FALSE;
1694
1695 if (!get_ldev(mdev)) {
1696 if (__ratelimit(&drbd_ratelimit_state))
1697 dev_err(DEV, "Can not write mirrored data block "
1698 "to local disk.\n");
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701 mdev->peer_seq++;
1702 spin_unlock(&mdev->peer_seq_lock);
1703
1704 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1705 atomic_inc(&mdev->current_epoch->epoch_size);
1706 return drbd_drain_block(mdev, data_size);
1707 }
1708
1709 /* get_ldev(mdev) successful.
1710 * Corresponding put_ldev done either below (on various errors),
1711 * or in drbd_endio_write_sec, if we successfully submit the data at
1712 * the end of this function. */
1713
1714 sector = be64_to_cpu(p->sector);
1715 e = read_in_block(mdev, p->block_id, sector, data_size);
1716 if (!e) {
1717 put_ldev(mdev);
1718 return FALSE;
1719 }
1720
1721 e->private_bio->bi_end_io = drbd_endio_write_sec;
1722 e->w.cb = e_end_block;
1723
1724 spin_lock(&mdev->epoch_lock);
1725 e->epoch = mdev->current_epoch;
1726 atomic_inc(&e->epoch->epoch_size);
1727 atomic_inc(&e->epoch->active);
1728
1729 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1730 struct drbd_epoch *epoch;
1731 /* Issue a barrier if we start a new epoch, and the previous epoch
1732 was not a epoch containing a single request which already was
1733 a Barrier. */
1734 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1735 if (epoch == e->epoch) {
1736 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
b411b363
PR
1737 rw |= (1<<BIO_RW_BARRIER);
1738 e->flags |= EE_IS_BARRIER;
1739 } else {
1740 if (atomic_read(&epoch->epoch_size) > 1 ||
1741 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1742 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
b411b363 1743 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
b411b363
PR
1744 rw |= (1<<BIO_RW_BARRIER);
1745 e->flags |= EE_IS_BARRIER;
1746 }
1747 }
1748 }
1749 spin_unlock(&mdev->epoch_lock);
1750
1751 dp_flags = be32_to_cpu(p->dp_flags);
1752 if (dp_flags & DP_HARDBARRIER) {
1753 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1754 /* rw |= (1<<BIO_RW_BARRIER); */
1755 }
1756 if (dp_flags & DP_RW_SYNC)
1757 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1758 if (dp_flags & DP_MAY_SET_IN_SYNC)
1759 e->flags |= EE_MAY_SET_IN_SYNC;
1760
1761 /* I'm the receiver, I do hold a net_cnt reference. */
1762 if (!mdev->net_conf->two_primaries) {
1763 spin_lock_irq(&mdev->req_lock);
1764 } else {
1765 /* don't get the req_lock yet,
1766 * we may sleep in drbd_wait_peer_seq */
1767 const int size = e->size;
1768 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1769 DEFINE_WAIT(wait);
1770 struct drbd_request *i;
1771 struct hlist_node *n;
1772 struct hlist_head *slot;
1773 int first;
1774
1775 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1776 BUG_ON(mdev->ee_hash == NULL);
1777 BUG_ON(mdev->tl_hash == NULL);
1778
1779 /* conflict detection and handling:
1780 * 1. wait on the sequence number,
1781 * in case this data packet overtook ACK packets.
1782 * 2. check our hash tables for conflicting requests.
1783 * we only need to walk the tl_hash, since an ee can not
1784 * have a conflict with an other ee: on the submitting
1785 * node, the corresponding req had already been conflicting,
1786 * and a conflicting req is never sent.
1787 *
1788 * Note: for two_primaries, we are protocol C,
1789 * so there cannot be any request that is DONE
1790 * but still on the transfer log.
1791 *
1792 * unconditionally add to the ee_hash.
1793 *
1794 * if no conflicting request is found:
1795 * submit.
1796 *
1797 * if any conflicting request is found
1798 * that has not yet been acked,
1799 * AND I have the "discard concurrent writes" flag:
1800 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1801 *
1802 * if any conflicting request is found:
1803 * block the receiver, waiting on misc_wait
1804 * until no more conflicting requests are there,
1805 * or we get interrupted (disconnect).
1806 *
1807 * we do not just write after local io completion of those
1808 * requests, but only after req is done completely, i.e.
1809 * we wait for the P_DISCARD_ACK to arrive!
1810 *
1811 * then proceed normally, i.e. submit.
1812 */
1813 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1814 goto out_interrupted;
1815
1816 spin_lock_irq(&mdev->req_lock);
1817
1818 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1819
1820#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1821 slot = tl_hash_slot(mdev, sector);
1822 first = 1;
1823 for (;;) {
1824 int have_unacked = 0;
1825 int have_conflict = 0;
1826 prepare_to_wait(&mdev->misc_wait, &wait,
1827 TASK_INTERRUPTIBLE);
1828 hlist_for_each_entry(i, n, slot, colision) {
1829 if (OVERLAPS) {
1830 /* only ALERT on first iteration,
1831 * we may be woken up early... */
1832 if (first)
1833 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1834 " new: %llus +%u; pending: %llus +%u\n",
1835 current->comm, current->pid,
1836 (unsigned long long)sector, size,
1837 (unsigned long long)i->sector, i->size);
1838 if (i->rq_state & RQ_NET_PENDING)
1839 ++have_unacked;
1840 ++have_conflict;
1841 }
1842 }
1843#undef OVERLAPS
1844 if (!have_conflict)
1845 break;
1846
1847 /* Discard Ack only for the _first_ iteration */
1848 if (first && discard && have_unacked) {
1849 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1850 (unsigned long long)sector);
1851 inc_unacked(mdev);
1852 e->w.cb = e_send_discard_ack;
1853 list_add_tail(&e->w.list, &mdev->done_ee);
1854
1855 spin_unlock_irq(&mdev->req_lock);
1856
1857 /* we could probably send that P_DISCARD_ACK ourselves,
1858 * but I don't like the receiver using the msock */
1859
1860 put_ldev(mdev);
1861 wake_asender(mdev);
1862 finish_wait(&mdev->misc_wait, &wait);
1863 return TRUE;
1864 }
1865
1866 if (signal_pending(current)) {
1867 hlist_del_init(&e->colision);
1868
1869 spin_unlock_irq(&mdev->req_lock);
1870
1871 finish_wait(&mdev->misc_wait, &wait);
1872 goto out_interrupted;
1873 }
1874
1875 spin_unlock_irq(&mdev->req_lock);
1876 if (first) {
1877 first = 0;
1878 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1879 "sec=%llus\n", (unsigned long long)sector);
1880 } else if (discard) {
1881 /* we had none on the first iteration.
1882 * there must be none now. */
1883 D_ASSERT(have_unacked == 0);
1884 }
1885 schedule();
1886 spin_lock_irq(&mdev->req_lock);
1887 }
1888 finish_wait(&mdev->misc_wait, &wait);
1889 }
1890
1891 list_add(&e->w.list, &mdev->active_ee);
1892 spin_unlock_irq(&mdev->req_lock);
1893
1894 switch (mdev->net_conf->wire_protocol) {
1895 case DRBD_PROT_C:
1896 inc_unacked(mdev);
1897 /* corresponding dec_unacked() in e_end_block()
1898 * respective _drbd_clear_done_ee */
1899 break;
1900 case DRBD_PROT_B:
1901 /* I really don't like it that the receiver thread
1902 * sends on the msock, but anyways */
1903 drbd_send_ack(mdev, P_RECV_ACK, e);
1904 break;
1905 case DRBD_PROT_A:
1906 /* nothing to do */
1907 break;
1908 }
1909
1910 if (mdev->state.pdsk == D_DISKLESS) {
1911 /* In case we have the only disk of the cluster, */
1912 drbd_set_out_of_sync(mdev, e->sector, e->size);
1913 e->flags |= EE_CALL_AL_COMPLETE_IO;
1914 drbd_al_begin_io(mdev, e->sector);
1915 }
1916
1917 e->private_bio->bi_rw = rw;
b411b363
PR
1918 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1919 /* accounting done in endio */
1920
1921 maybe_kick_lo(mdev);
1922 return TRUE;
1923
1924out_interrupted:
1925 /* yes, the epoch_size now is imbalanced.
1926 * but we drop the connection anyways, so we don't have a chance to
1927 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1928 put_ldev(mdev);
1929 drbd_free_ee(mdev, e);
1930 return FALSE;
1931}
1932
1933static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1934{
1935 sector_t sector;
1936 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1937 struct drbd_epoch_entry *e;
1938 struct digest_info *di = NULL;
1939 int size, digest_size;
1940 unsigned int fault_type;
1941 struct p_block_req *p =
1942 (struct p_block_req *)h;
1943 const int brps = sizeof(*p)-sizeof(*h);
1944
1945 if (drbd_recv(mdev, h->payload, brps) != brps)
1946 return FALSE;
1947
1948 sector = be64_to_cpu(p->sector);
1949 size = be32_to_cpu(p->blksize);
1950
1951 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1952 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1953 (unsigned long long)sector, size);
1954 return FALSE;
1955 }
1956 if (sector + (size>>9) > capacity) {
1957 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1958 (unsigned long long)sector, size);
1959 return FALSE;
1960 }
1961
1962 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1963 if (__ratelimit(&drbd_ratelimit_state))
1964 dev_err(DEV, "Can not satisfy peer's read request, "
1965 "no local data.\n");
1966 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1967 P_NEG_RS_DREPLY , p);
c3470cde 1968 return drbd_drain_block(mdev, h->length - brps);
b411b363
PR
1969 }
1970
1971 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1972 * "criss-cross" setup, that might cause write-out on some other DRBD,
1973 * which in turn might block on the other node at this very place. */
1974 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1975 if (!e) {
1976 put_ldev(mdev);
1977 return FALSE;
1978 }
1979
1980 e->private_bio->bi_rw = READ;
1981 e->private_bio->bi_end_io = drbd_endio_read_sec;
1982
1983 switch (h->command) {
1984 case P_DATA_REQUEST:
1985 e->w.cb = w_e_end_data_req;
1986 fault_type = DRBD_FAULT_DT_RD;
1987 break;
1988 case P_RS_DATA_REQUEST:
1989 e->w.cb = w_e_end_rsdata_req;
1990 fault_type = DRBD_FAULT_RS_RD;
1991 /* Eventually this should become asynchronously. Currently it
1992 * blocks the whole receiver just to delay the reading of a
1993 * resync data block.
1994 * the drbd_work_queue mechanism is made for this...
1995 */
1996 if (!drbd_rs_begin_io(mdev, sector)) {
1997 /* we have been interrupted,
1998 * probably connection lost! */
1999 D_ASSERT(signal_pending(current));
2000 goto out_free_e;
2001 }
2002 break;
2003
2004 case P_OV_REPLY:
2005 case P_CSUM_RS_REQUEST:
2006 fault_type = DRBD_FAULT_RS_RD;
2007 digest_size = h->length - brps ;
2008 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2009 if (!di)
2010 goto out_free_e;
2011
2012 di->digest_size = digest_size;
2013 di->digest = (((char *)di)+sizeof(struct digest_info));
2014
2015 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2016 goto out_free_e;
2017
2018 e->block_id = (u64)(unsigned long)di;
2019 if (h->command == P_CSUM_RS_REQUEST) {
2020 D_ASSERT(mdev->agreed_pro_version >= 89);
2021 e->w.cb = w_e_end_csum_rs_req;
2022 } else if (h->command == P_OV_REPLY) {
2023 e->w.cb = w_e_end_ov_reply;
2024 dec_rs_pending(mdev);
2025 break;
2026 }
2027
2028 if (!drbd_rs_begin_io(mdev, sector)) {
2029 /* we have been interrupted, probably connection lost! */
2030 D_ASSERT(signal_pending(current));
2031 goto out_free_e;
2032 }
2033 break;
2034
2035 case P_OV_REQUEST:
2036 if (mdev->state.conn >= C_CONNECTED &&
2037 mdev->state.conn != C_VERIFY_T)
2038 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2039 drbd_conn_str(mdev->state.conn));
2040 if (mdev->ov_start_sector == ~(sector_t)0 &&
2041 mdev->agreed_pro_version >= 90) {
2042 mdev->ov_start_sector = sector;
2043 mdev->ov_position = sector;
2044 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2045 dev_info(DEV, "Online Verify start sector: %llu\n",
2046 (unsigned long long)sector);
2047 }
2048 e->w.cb = w_e_end_ov_req;
2049 fault_type = DRBD_FAULT_RS_RD;
2050 /* Eventually this should become asynchronous. Currently it
2051 * blocks the whole receiver just to delay the reading of a
2052 * resync data block.
2053 * the drbd_work_queue mechanism is made for this...
2054 */
2055 if (!drbd_rs_begin_io(mdev, sector)) {
2056 /* we have been interrupted,
2057 * probably connection lost! */
2058 D_ASSERT(signal_pending(current));
2059 goto out_free_e;
2060 }
2061 break;
2062
2063
2064 default:
2065 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2066 cmdname(h->command));
2067 fault_type = DRBD_FAULT_MAX;
2068 }
2069
2070 spin_lock_irq(&mdev->req_lock);
2071 list_add(&e->w.list, &mdev->read_ee);
2072 spin_unlock_irq(&mdev->req_lock);
2073
2074 inc_unacked(mdev);
2075
b411b363
PR
2076 drbd_generic_make_request(mdev, fault_type, e->private_bio);
2077 maybe_kick_lo(mdev);
2078
2079 return TRUE;
2080
2081out_free_e:
2082 kfree(di);
2083 put_ldev(mdev);
2084 drbd_free_ee(mdev, e);
2085 return FALSE;
2086}
2087
2088static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2089{
2090 int self, peer, rv = -100;
2091 unsigned long ch_self, ch_peer;
2092
2093 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2094 peer = mdev->p_uuid[UI_BITMAP] & 1;
2095
2096 ch_peer = mdev->p_uuid[UI_SIZE];
2097 ch_self = mdev->comm_bm_set;
2098
2099 switch (mdev->net_conf->after_sb_0p) {
2100 case ASB_CONSENSUS:
2101 case ASB_DISCARD_SECONDARY:
2102 case ASB_CALL_HELPER:
2103 dev_err(DEV, "Configuration error.\n");
2104 break;
2105 case ASB_DISCONNECT:
2106 break;
2107 case ASB_DISCARD_YOUNGER_PRI:
2108 if (self == 0 && peer == 1) {
2109 rv = -1;
2110 break;
2111 }
2112 if (self == 1 && peer == 0) {
2113 rv = 1;
2114 break;
2115 }
2116 /* Else fall through to one of the other strategies... */
2117 case ASB_DISCARD_OLDER_PRI:
2118 if (self == 0 && peer == 1) {
2119 rv = 1;
2120 break;
2121 }
2122 if (self == 1 && peer == 0) {
2123 rv = -1;
2124 break;
2125 }
2126 /* Else fall through to one of the other strategies... */
ad19bf6e 2127 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
b411b363
PR
2128 "Using discard-least-changes instead\n");
2129 case ASB_DISCARD_ZERO_CHG:
2130 if (ch_peer == 0 && ch_self == 0) {
2131 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2132 ? -1 : 1;
2133 break;
2134 } else {
2135 if (ch_peer == 0) { rv = 1; break; }
2136 if (ch_self == 0) { rv = -1; break; }
2137 }
2138 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2139 break;
2140 case ASB_DISCARD_LEAST_CHG:
2141 if (ch_self < ch_peer)
2142 rv = -1;
2143 else if (ch_self > ch_peer)
2144 rv = 1;
2145 else /* ( ch_self == ch_peer ) */
2146 /* Well, then use something else. */
2147 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2148 ? -1 : 1;
2149 break;
2150 case ASB_DISCARD_LOCAL:
2151 rv = -1;
2152 break;
2153 case ASB_DISCARD_REMOTE:
2154 rv = 1;
2155 }
2156
2157 return rv;
2158}
2159
2160static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2161{
2162 int self, peer, hg, rv = -100;
2163
2164 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2165 peer = mdev->p_uuid[UI_BITMAP] & 1;
2166
2167 switch (mdev->net_conf->after_sb_1p) {
2168 case ASB_DISCARD_YOUNGER_PRI:
2169 case ASB_DISCARD_OLDER_PRI:
2170 case ASB_DISCARD_LEAST_CHG:
2171 case ASB_DISCARD_LOCAL:
2172 case ASB_DISCARD_REMOTE:
2173 dev_err(DEV, "Configuration error.\n");
2174 break;
2175 case ASB_DISCONNECT:
2176 break;
2177 case ASB_CONSENSUS:
2178 hg = drbd_asb_recover_0p(mdev);
2179 if (hg == -1 && mdev->state.role == R_SECONDARY)
2180 rv = hg;
2181 if (hg == 1 && mdev->state.role == R_PRIMARY)
2182 rv = hg;
2183 break;
2184 case ASB_VIOLENTLY:
2185 rv = drbd_asb_recover_0p(mdev);
2186 break;
2187 case ASB_DISCARD_SECONDARY:
2188 return mdev->state.role == R_PRIMARY ? 1 : -1;
2189 case ASB_CALL_HELPER:
2190 hg = drbd_asb_recover_0p(mdev);
2191 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2192 self = drbd_set_role(mdev, R_SECONDARY, 0);
2193 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2194 * we might be here in C_WF_REPORT_PARAMS which is transient.
2195 * we do not need to wait for the after state change work either. */
2196 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2197 if (self != SS_SUCCESS) {
2198 drbd_khelper(mdev, "pri-lost-after-sb");
2199 } else {
2200 dev_warn(DEV, "Successfully gave up primary role.\n");
2201 rv = hg;
2202 }
2203 } else
2204 rv = hg;
2205 }
2206
2207 return rv;
2208}
2209
2210static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2211{
2212 int self, peer, hg, rv = -100;
2213
2214 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2215 peer = mdev->p_uuid[UI_BITMAP] & 1;
2216
2217 switch (mdev->net_conf->after_sb_2p) {
2218 case ASB_DISCARD_YOUNGER_PRI:
2219 case ASB_DISCARD_OLDER_PRI:
2220 case ASB_DISCARD_LEAST_CHG:
2221 case ASB_DISCARD_LOCAL:
2222 case ASB_DISCARD_REMOTE:
2223 case ASB_CONSENSUS:
2224 case ASB_DISCARD_SECONDARY:
2225 dev_err(DEV, "Configuration error.\n");
2226 break;
2227 case ASB_VIOLENTLY:
2228 rv = drbd_asb_recover_0p(mdev);
2229 break;
2230 case ASB_DISCONNECT:
2231 break;
2232 case ASB_CALL_HELPER:
2233 hg = drbd_asb_recover_0p(mdev);
2234 if (hg == -1) {
2235 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2236 * we might be here in C_WF_REPORT_PARAMS which is transient.
2237 * we do not need to wait for the after state change work either. */
2238 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2239 if (self != SS_SUCCESS) {
2240 drbd_khelper(mdev, "pri-lost-after-sb");
2241 } else {
2242 dev_warn(DEV, "Successfully gave up primary role.\n");
2243 rv = hg;
2244 }
2245 } else
2246 rv = hg;
2247 }
2248
2249 return rv;
2250}
2251
2252static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2253 u64 bits, u64 flags)
2254{
2255 if (!uuid) {
2256 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2257 return;
2258 }
2259 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2260 text,
2261 (unsigned long long)uuid[UI_CURRENT],
2262 (unsigned long long)uuid[UI_BITMAP],
2263 (unsigned long long)uuid[UI_HISTORY_START],
2264 (unsigned long long)uuid[UI_HISTORY_END],
2265 (unsigned long long)bits,
2266 (unsigned long long)flags);
2267}
2268
2269/*
2270 100 after split brain try auto recover
2271 2 C_SYNC_SOURCE set BitMap
2272 1 C_SYNC_SOURCE use BitMap
2273 0 no Sync
2274 -1 C_SYNC_TARGET use BitMap
2275 -2 C_SYNC_TARGET set BitMap
2276 -100 after split brain, disconnect
2277-1000 unrelated data
2278 */
2279static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2280{
2281 u64 self, peer;
2282 int i, j;
2283
2284 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2285 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2286
2287 *rule_nr = 10;
2288 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2289 return 0;
2290
2291 *rule_nr = 20;
2292 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2293 peer != UUID_JUST_CREATED)
2294 return -2;
2295
2296 *rule_nr = 30;
2297 if (self != UUID_JUST_CREATED &&
2298 (peer == UUID_JUST_CREATED || peer == (u64)0))
2299 return 2;
2300
2301 if (self == peer) {
2302 int rct, dc; /* roles at crash time */
2303
2304 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2305
2306 if (mdev->agreed_pro_version < 91)
2307 return -1001;
2308
2309 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2310 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2311 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2312 drbd_uuid_set_bm(mdev, 0UL);
2313
2314 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2315 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2316 *rule_nr = 34;
2317 } else {
2318 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2319 *rule_nr = 36;
2320 }
2321
2322 return 1;
2323 }
2324
2325 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2326
2327 if (mdev->agreed_pro_version < 91)
2328 return -1001;
2329
2330 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2331 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2332 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2333
2334 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2335 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2336 mdev->p_uuid[UI_BITMAP] = 0UL;
2337
2338 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2339 *rule_nr = 35;
2340 } else {
2341 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2342 *rule_nr = 37;
2343 }
2344
2345 return -1;
2346 }
2347
2348 /* Common power [off|failure] */
2349 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2350 (mdev->p_uuid[UI_FLAGS] & 2);
2351 /* lowest bit is set when we were primary,
2352 * next bit (weight 2) is set when peer was primary */
2353 *rule_nr = 40;
2354
2355 switch (rct) {
2356 case 0: /* !self_pri && !peer_pri */ return 0;
2357 case 1: /* self_pri && !peer_pri */ return 1;
2358 case 2: /* !self_pri && peer_pri */ return -1;
2359 case 3: /* self_pri && peer_pri */
2360 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2361 return dc ? -1 : 1;
2362 }
2363 }
2364
2365 *rule_nr = 50;
2366 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2367 if (self == peer)
2368 return -1;
2369
2370 *rule_nr = 51;
2371 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2372 if (self == peer) {
2373 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2374 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2375 if (self == peer) {
2376 /* The last P_SYNC_UUID did not get though. Undo the last start of
2377 resync as sync source modifications of the peer's UUIDs. */
2378
2379 if (mdev->agreed_pro_version < 91)
2380 return -1001;
2381
2382 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2383 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2384 return -1;
2385 }
2386 }
2387
2388 *rule_nr = 60;
2389 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2390 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2391 peer = mdev->p_uuid[i] & ~((u64)1);
2392 if (self == peer)
2393 return -2;
2394 }
2395
2396 *rule_nr = 70;
2397 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2398 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2399 if (self == peer)
2400 return 1;
2401
2402 *rule_nr = 71;
2403 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2404 if (self == peer) {
2405 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2406 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2407 if (self == peer) {
2408 /* The last P_SYNC_UUID did not get though. Undo the last start of
2409 resync as sync source modifications of our UUIDs. */
2410
2411 if (mdev->agreed_pro_version < 91)
2412 return -1001;
2413
2414 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2415 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2416
2417 dev_info(DEV, "Undid last start of resync:\n");
2418
2419 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2420 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2421
2422 return 1;
2423 }
2424 }
2425
2426
2427 *rule_nr = 80;
d8c2a36b 2428 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
b411b363
PR
2429 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2430 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2431 if (self == peer)
2432 return 2;
2433 }
2434
2435 *rule_nr = 90;
2436 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2437 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2438 if (self == peer && self != ((u64)0))
2439 return 100;
2440
2441 *rule_nr = 100;
2442 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2443 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2444 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2445 peer = mdev->p_uuid[j] & ~((u64)1);
2446 if (self == peer)
2447 return -100;
2448 }
2449 }
2450
2451 return -1000;
2452}
2453
2454/* drbd_sync_handshake() returns the new conn state on success, or
2455 CONN_MASK (-1) on failure.
2456 */
2457static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2458 enum drbd_disk_state peer_disk) __must_hold(local)
2459{
2460 int hg, rule_nr;
2461 enum drbd_conns rv = C_MASK;
2462 enum drbd_disk_state mydisk;
2463
2464 mydisk = mdev->state.disk;
2465 if (mydisk == D_NEGOTIATING)
2466 mydisk = mdev->new_state_tmp.disk;
2467
2468 dev_info(DEV, "drbd_sync_handshake:\n");
2469 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2470 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2471 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2472
2473 hg = drbd_uuid_compare(mdev, &rule_nr);
2474
2475 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2476
2477 if (hg == -1000) {
2478 dev_alert(DEV, "Unrelated data, aborting!\n");
2479 return C_MASK;
2480 }
2481 if (hg == -1001) {
2482 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2483 return C_MASK;
2484 }
2485
2486 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2487 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2488 int f = (hg == -100) || abs(hg) == 2;
2489 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2490 if (f)
2491 hg = hg*2;
2492 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2493 hg > 0 ? "source" : "target");
2494 }
2495
3a11a487
AG
2496 if (abs(hg) == 100)
2497 drbd_khelper(mdev, "initial-split-brain");
2498
b411b363
PR
2499 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2500 int pcount = (mdev->state.role == R_PRIMARY)
2501 + (peer_role == R_PRIMARY);
2502 int forced = (hg == -100);
2503
2504 switch (pcount) {
2505 case 0:
2506 hg = drbd_asb_recover_0p(mdev);
2507 break;
2508 case 1:
2509 hg = drbd_asb_recover_1p(mdev);
2510 break;
2511 case 2:
2512 hg = drbd_asb_recover_2p(mdev);
2513 break;
2514 }
2515 if (abs(hg) < 100) {
2516 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2517 "automatically solved. Sync from %s node\n",
2518 pcount, (hg < 0) ? "peer" : "this");
2519 if (forced) {
2520 dev_warn(DEV, "Doing a full sync, since"
2521 " UUIDs where ambiguous.\n");
2522 hg = hg*2;
2523 }
2524 }
2525 }
2526
2527 if (hg == -100) {
2528 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2529 hg = -1;
2530 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2531 hg = 1;
2532
2533 if (abs(hg) < 100)
2534 dev_warn(DEV, "Split-Brain detected, manually solved. "
2535 "Sync from %s node\n",
2536 (hg < 0) ? "peer" : "this");
2537 }
2538
2539 if (hg == -100) {
580b9767
LE
2540 /* FIXME this log message is not correct if we end up here
2541 * after an attempted attach on a diskless node.
2542 * We just refuse to attach -- well, we drop the "connection"
2543 * to that disk, in a way... */
3a11a487 2544 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
b411b363
PR
2545 drbd_khelper(mdev, "split-brain");
2546 return C_MASK;
2547 }
2548
2549 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2550 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2551 return C_MASK;
2552 }
2553
2554 if (hg < 0 && /* by intention we do not use mydisk here. */
2555 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2556 switch (mdev->net_conf->rr_conflict) {
2557 case ASB_CALL_HELPER:
2558 drbd_khelper(mdev, "pri-lost");
2559 /* fall through */
2560 case ASB_DISCONNECT:
2561 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2562 return C_MASK;
2563 case ASB_VIOLENTLY:
2564 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2565 "assumption\n");
2566 }
2567 }
2568
cf14c2e9
PR
2569 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2570 if (hg == 0)
2571 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2572 else
2573 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2574 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2575 abs(hg) >= 2 ? "full" : "bit-map based");
2576 return C_MASK;
2577 }
2578
b411b363
PR
2579 if (abs(hg) >= 2) {
2580 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2581 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2582 return C_MASK;
2583 }
2584
2585 if (hg > 0) { /* become sync source. */
2586 rv = C_WF_BITMAP_S;
2587 } else if (hg < 0) { /* become sync target */
2588 rv = C_WF_BITMAP_T;
2589 } else {
2590 rv = C_CONNECTED;
2591 if (drbd_bm_total_weight(mdev)) {
2592 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2593 drbd_bm_total_weight(mdev));
2594 }
2595 }
2596
2597 return rv;
2598}
2599
2600/* returns 1 if invalid */
2601static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2602{
2603 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2604 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2605 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2606 return 0;
2607
2608 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2609 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2610 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2611 return 1;
2612
2613 /* everything else is valid if they are equal on both sides. */
2614 if (peer == self)
2615 return 0;
2616
2617 /* everything es is invalid. */
2618 return 1;
2619}
2620
2621static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2622{
2623 struct p_protocol *p = (struct p_protocol *)h;
2624 int header_size, data_size;
2625 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
cf14c2e9 2626 int p_want_lose, p_two_primaries, cf;
b411b363
PR
2627 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2628
2629 header_size = sizeof(*p) - sizeof(*h);
2630 data_size = h->length - header_size;
2631
2632 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2633 return FALSE;
2634
2635 p_proto = be32_to_cpu(p->protocol);
2636 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2637 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2638 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
b411b363 2639 p_two_primaries = be32_to_cpu(p->two_primaries);
cf14c2e9
PR
2640 cf = be32_to_cpu(p->conn_flags);
2641 p_want_lose = cf & CF_WANT_LOSE;
2642
2643 clear_bit(CONN_DRY_RUN, &mdev->flags);
2644
2645 if (cf & CF_DRY_RUN)
2646 set_bit(CONN_DRY_RUN, &mdev->flags);
b411b363
PR
2647
2648 if (p_proto != mdev->net_conf->wire_protocol) {
2649 dev_err(DEV, "incompatible communication protocols\n");
2650 goto disconnect;
2651 }
2652
2653 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2654 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2655 goto disconnect;
2656 }
2657
2658 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2659 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2660 goto disconnect;
2661 }
2662
2663 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2664 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2665 goto disconnect;
2666 }
2667
2668 if (p_want_lose && mdev->net_conf->want_lose) {
2669 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2670 goto disconnect;
2671 }
2672
2673 if (p_two_primaries != mdev->net_conf->two_primaries) {
2674 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2675 goto disconnect;
2676 }
2677
2678 if (mdev->agreed_pro_version >= 87) {
2679 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2680
2681 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2682 return FALSE;
2683
2684 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2685 if (strcmp(p_integrity_alg, my_alg)) {
2686 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2687 goto disconnect;
2688 }
2689 dev_info(DEV, "data-integrity-alg: %s\n",
2690 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2691 }
2692
2693 return TRUE;
2694
2695disconnect:
2696 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2697 return FALSE;
2698}
2699
2700/* helper function
2701 * input: alg name, feature name
2702 * return: NULL (alg name was "")
2703 * ERR_PTR(error) if something goes wrong
2704 * or the crypto hash ptr, if it worked out ok. */
2705struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2706 const char *alg, const char *name)
2707{
2708 struct crypto_hash *tfm;
2709
2710 if (!alg[0])
2711 return NULL;
2712
2713 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2714 if (IS_ERR(tfm)) {
2715 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2716 alg, name, PTR_ERR(tfm));
2717 return tfm;
2718 }
2719 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2720 crypto_free_hash(tfm);
2721 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2722 return ERR_PTR(-EINVAL);
2723 }
2724 return tfm;
2725}
2726
2727static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2728{
2729 int ok = TRUE;
2730 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2731 unsigned int header_size, data_size, exp_max_sz;
2732 struct crypto_hash *verify_tfm = NULL;
2733 struct crypto_hash *csums_tfm = NULL;
2734 const int apv = mdev->agreed_pro_version;
2735
2736 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2737 : apv == 88 ? sizeof(struct p_rs_param)
2738 + SHARED_SECRET_MAX
2739 : /* 89 */ sizeof(struct p_rs_param_89);
2740
2741 if (h->length > exp_max_sz) {
2742 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2743 h->length, exp_max_sz);
2744 return FALSE;
2745 }
2746
2747 if (apv <= 88) {
2748 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2749 data_size = h->length - header_size;
2750 } else /* apv >= 89 */ {
2751 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2752 data_size = h->length - header_size;
2753 D_ASSERT(data_size == 0);
2754 }
2755
2756 /* initialize verify_alg and csums_alg */
2757 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2758
2759 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2760 return FALSE;
2761
2762 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2763
2764 if (apv >= 88) {
2765 if (apv == 88) {
2766 if (data_size > SHARED_SECRET_MAX) {
2767 dev_err(DEV, "verify-alg too long, "
2768 "peer wants %u, accepting only %u byte\n",
2769 data_size, SHARED_SECRET_MAX);
2770 return FALSE;
2771 }
2772
2773 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2774 return FALSE;
2775
2776 /* we expect NUL terminated string */
2777 /* but just in case someone tries to be evil */
2778 D_ASSERT(p->verify_alg[data_size-1] == 0);
2779 p->verify_alg[data_size-1] = 0;
2780
2781 } else /* apv >= 89 */ {
2782 /* we still expect NUL terminated strings */
2783 /* but just in case someone tries to be evil */
2784 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2785 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2786 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2787 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2788 }
2789
2790 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2791 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2792 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2793 mdev->sync_conf.verify_alg, p->verify_alg);
2794 goto disconnect;
2795 }
2796 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2797 p->verify_alg, "verify-alg");
2798 if (IS_ERR(verify_tfm)) {
2799 verify_tfm = NULL;
2800 goto disconnect;
2801 }
2802 }
2803
2804 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2805 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2806 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2807 mdev->sync_conf.csums_alg, p->csums_alg);
2808 goto disconnect;
2809 }
2810 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2811 p->csums_alg, "csums-alg");
2812 if (IS_ERR(csums_tfm)) {
2813 csums_tfm = NULL;
2814 goto disconnect;
2815 }
2816 }
2817
2818
2819 spin_lock(&mdev->peer_seq_lock);
2820 /* lock against drbd_nl_syncer_conf() */
2821 if (verify_tfm) {
2822 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2823 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2824 crypto_free_hash(mdev->verify_tfm);
2825 mdev->verify_tfm = verify_tfm;
2826 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2827 }
2828 if (csums_tfm) {
2829 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2830 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2831 crypto_free_hash(mdev->csums_tfm);
2832 mdev->csums_tfm = csums_tfm;
2833 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2834 }
2835 spin_unlock(&mdev->peer_seq_lock);
2836 }
2837
2838 return ok;
2839disconnect:
2840 /* just for completeness: actually not needed,
2841 * as this is not reached if csums_tfm was ok. */
2842 crypto_free_hash(csums_tfm);
2843 /* but free the verify_tfm again, if csums_tfm did not work out */
2844 crypto_free_hash(verify_tfm);
2845 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2846 return FALSE;
2847}
2848
2849static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2850{
2851 /* sorry, we currently have no working implementation
2852 * of distributed TCQ */
2853}
2854
2855/* warn if the arguments differ by more than 12.5% */
2856static void warn_if_differ_considerably(struct drbd_conf *mdev,
2857 const char *s, sector_t a, sector_t b)
2858{
2859 sector_t d;
2860 if (a == 0 || b == 0)
2861 return;
2862 d = (a > b) ? (a - b) : (b - a);
2863 if (d > (a>>3) || d > (b>>3))
2864 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2865 (unsigned long long)a, (unsigned long long)b);
2866}
2867
2868static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2869{
2870 struct p_sizes *p = (struct p_sizes *)h;
2871 enum determine_dev_size dd = unchanged;
2872 unsigned int max_seg_s;
2873 sector_t p_size, p_usize, my_usize;
2874 int ldsc = 0; /* local disk size changed */
e89b591c 2875 enum dds_flags ddsf;
b411b363
PR
2876
2877 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2878 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2879 return FALSE;
2880
2881 p_size = be64_to_cpu(p->d_size);
2882 p_usize = be64_to_cpu(p->u_size);
2883
2884 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2885 dev_err(DEV, "some backing storage is needed\n");
2886 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2887 return FALSE;
2888 }
2889
2890 /* just store the peer's disk size for now.
2891 * we still need to figure out whether we accept that. */
2892 mdev->p_size = p_size;
2893
2894#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2895 if (get_ldev(mdev)) {
2896 warn_if_differ_considerably(mdev, "lower level device sizes",
2897 p_size, drbd_get_max_capacity(mdev->ldev));
2898 warn_if_differ_considerably(mdev, "user requested size",
2899 p_usize, mdev->ldev->dc.disk_size);
2900
2901 /* if this is the first connect, or an otherwise expected
2902 * param exchange, choose the minimum */
2903 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2904 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2905 p_usize);
2906
2907 my_usize = mdev->ldev->dc.disk_size;
2908
2909 if (mdev->ldev->dc.disk_size != p_usize) {
2910 mdev->ldev->dc.disk_size = p_usize;
2911 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2912 (unsigned long)mdev->ldev->dc.disk_size);
2913 }
2914
2915 /* Never shrink a device with usable data during connect.
2916 But allow online shrinking if we are connected. */
a393db6f 2917 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
b411b363
PR
2918 drbd_get_capacity(mdev->this_bdev) &&
2919 mdev->state.disk >= D_OUTDATED &&
2920 mdev->state.conn < C_CONNECTED) {
2921 dev_err(DEV, "The peer's disk size is too small!\n");
2922 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2923 mdev->ldev->dc.disk_size = my_usize;
2924 put_ldev(mdev);
2925 return FALSE;
2926 }
2927 put_ldev(mdev);
2928 }
2929#undef min_not_zero
2930
e89b591c 2931 ddsf = be16_to_cpu(p->dds_flags);
b411b363 2932 if (get_ldev(mdev)) {
e89b591c 2933 dd = drbd_determin_dev_size(mdev, ddsf);
b411b363
PR
2934 put_ldev(mdev);
2935 if (dd == dev_size_error)
2936 return FALSE;
2937 drbd_md_sync(mdev);
2938 } else {
2939 /* I am diskless, need to accept the peer's size. */
2940 drbd_set_my_capacity(mdev, p_size);
2941 }
2942
b411b363
PR
2943 if (get_ldev(mdev)) {
2944 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2945 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2946 ldsc = 1;
2947 }
2948
2949 max_seg_s = be32_to_cpu(p->max_segment_size);
2950 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2951 drbd_setup_queue_param(mdev, max_seg_s);
2952
e89b591c 2953 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
b411b363
PR
2954 put_ldev(mdev);
2955 }
2956
2957 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2958 if (be64_to_cpu(p->c_size) !=
2959 drbd_get_capacity(mdev->this_bdev) || ldsc) {
2960 /* we have different sizes, probably peer
2961 * needs to know my new size... */
e89b591c 2962 drbd_send_sizes(mdev, 0, ddsf);
b411b363
PR
2963 }
2964 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2965 (dd == grew && mdev->state.conn == C_CONNECTED)) {
2966 if (mdev->state.pdsk >= D_INCONSISTENT &&
e89b591c
PR
2967 mdev->state.disk >= D_INCONSISTENT) {
2968 if (ddsf & DDSF_NO_RESYNC)
2969 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
2970 else
2971 resync_after_online_grow(mdev);
2972 } else
b411b363
PR
2973 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2974 }
2975 }
2976
2977 return TRUE;
2978}
2979
2980static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2981{
2982 struct p_uuids *p = (struct p_uuids *)h;
2983 u64 *p_uuid;
2984 int i;
2985
2986 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2987 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2988 return FALSE;
2989
2990 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2991
2992 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2993 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2994
2995 kfree(mdev->p_uuid);
2996 mdev->p_uuid = p_uuid;
2997
2998 if (mdev->state.conn < C_CONNECTED &&
2999 mdev->state.disk < D_INCONSISTENT &&
3000 mdev->state.role == R_PRIMARY &&
3001 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3002 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3003 (unsigned long long)mdev->ed_uuid);
3004 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3005 return FALSE;
3006 }
3007
3008 if (get_ldev(mdev)) {
3009 int skip_initial_sync =
3010 mdev->state.conn == C_CONNECTED &&
3011 mdev->agreed_pro_version >= 90 &&
3012 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3013 (p_uuid[UI_FLAGS] & 8);
3014 if (skip_initial_sync) {
3015 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3016 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3017 "clear_n_write from receive_uuids");
3018 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3019 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3020 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3021 CS_VERBOSE, NULL);
3022 drbd_md_sync(mdev);
3023 }
3024 put_ldev(mdev);
3025 }
3026
3027 /* Before we test for the disk state, we should wait until an eventually
3028 ongoing cluster wide state change is finished. That is important if
3029 we are primary and are detaching from our disk. We need to see the
3030 new disk state... */
3031 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3032 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3033 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3034
3035 return TRUE;
3036}
3037
3038/**
3039 * convert_state() - Converts the peer's view of the cluster state to our point of view
3040 * @ps: The state as seen by the peer.
3041 */
3042static union drbd_state convert_state(union drbd_state ps)
3043{
3044 union drbd_state ms;
3045
3046 static enum drbd_conns c_tab[] = {
3047 [C_CONNECTED] = C_CONNECTED,
3048
3049 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3050 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3051 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3052 [C_VERIFY_S] = C_VERIFY_T,
3053 [C_MASK] = C_MASK,
3054 };
3055
3056 ms.i = ps.i;
3057
3058 ms.conn = c_tab[ps.conn];
3059 ms.peer = ps.role;
3060 ms.role = ps.peer;
3061 ms.pdsk = ps.disk;
3062 ms.disk = ps.pdsk;
3063 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3064
3065 return ms;
3066}
3067
3068static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3069{
3070 struct p_req_state *p = (struct p_req_state *)h;
3071 union drbd_state mask, val;
3072 int rv;
3073
3074 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3075 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3076 return FALSE;
3077
3078 mask.i = be32_to_cpu(p->mask);
3079 val.i = be32_to_cpu(p->val);
3080
3081 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3082 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3083 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3084 return TRUE;
3085 }
3086
3087 mask = convert_state(mask);
3088 val = convert_state(val);
3089
3090 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3091
3092 drbd_send_sr_reply(mdev, rv);
3093 drbd_md_sync(mdev);
3094
3095 return TRUE;
3096}
3097
3098static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3099{
3100 struct p_state *p = (struct p_state *)h;
3101 enum drbd_conns nconn, oconn;
3102 union drbd_state ns, peer_state;
3103 enum drbd_disk_state real_peer_disk;
3104 int rv;
3105
3106 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3107 return FALSE;
3108
3109 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3110 return FALSE;
3111
3112 peer_state.i = be32_to_cpu(p->state);
3113
3114 real_peer_disk = peer_state.disk;
3115 if (peer_state.disk == D_NEGOTIATING) {
3116 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3117 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3118 }
3119
3120 spin_lock_irq(&mdev->req_lock);
3121 retry:
3122 oconn = nconn = mdev->state.conn;
3123 spin_unlock_irq(&mdev->req_lock);
3124
3125 if (nconn == C_WF_REPORT_PARAMS)
3126 nconn = C_CONNECTED;
3127
3128 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3129 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3130 int cr; /* consider resync */
3131
3132 /* if we established a new connection */
3133 cr = (oconn < C_CONNECTED);
3134 /* if we had an established connection
3135 * and one of the nodes newly attaches a disk */
3136 cr |= (oconn == C_CONNECTED &&
3137 (peer_state.disk == D_NEGOTIATING ||
3138 mdev->state.disk == D_NEGOTIATING));
3139 /* if we have both been inconsistent, and the peer has been
3140 * forced to be UpToDate with --overwrite-data */
3141 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3142 /* if we had been plain connected, and the admin requested to
3143 * start a sync by "invalidate" or "invalidate-remote" */
3144 cr |= (oconn == C_CONNECTED &&
3145 (peer_state.conn >= C_STARTING_SYNC_S &&
3146 peer_state.conn <= C_WF_BITMAP_T));
3147
3148 if (cr)
3149 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3150
3151 put_ldev(mdev);
3152 if (nconn == C_MASK) {
580b9767 3153 nconn = C_CONNECTED;
b411b363
PR
3154 if (mdev->state.disk == D_NEGOTIATING) {
3155 drbd_force_state(mdev, NS(disk, D_DISKLESS));
b411b363
PR
3156 } else if (peer_state.disk == D_NEGOTIATING) {
3157 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3158 peer_state.disk = D_DISKLESS;
580b9767 3159 real_peer_disk = D_DISKLESS;
b411b363 3160 } else {
cf14c2e9
PR
3161 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3162 return FALSE;
b411b363
PR
3163 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3164 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3165 return FALSE;
3166 }
3167 }
3168 }
3169
3170 spin_lock_irq(&mdev->req_lock);
3171 if (mdev->state.conn != oconn)
3172 goto retry;
3173 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3174 ns.i = mdev->state.i;
3175 ns.conn = nconn;
3176 ns.peer = peer_state.role;
3177 ns.pdsk = real_peer_disk;
3178 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3179 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3180 ns.disk = mdev->new_state_tmp.disk;
3181
3182 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3183 ns = mdev->state;
3184 spin_unlock_irq(&mdev->req_lock);
3185
3186 if (rv < SS_SUCCESS) {
3187 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3188 return FALSE;
3189 }
3190
3191 if (oconn > C_WF_REPORT_PARAMS) {
3192 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3193 peer_state.disk != D_NEGOTIATING ) {
3194 /* we want resync, peer has not yet decided to sync... */
3195 /* Nowadays only used when forcing a node into primary role and
3196 setting its disk to UpToDate with that */
3197 drbd_send_uuids(mdev);
3198 drbd_send_state(mdev);
3199 }
3200 }
3201
3202 mdev->net_conf->want_lose = 0;
3203
3204 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3205
3206 return TRUE;
3207}
3208
3209static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3210{
3211 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3212
3213 wait_event(mdev->misc_wait,
3214 mdev->state.conn == C_WF_SYNC_UUID ||
3215 mdev->state.conn < C_CONNECTED ||
3216 mdev->state.disk < D_NEGOTIATING);
3217
3218 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3219
3220 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3221 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3222 return FALSE;
3223
3224 /* Here the _drbd_uuid_ functions are right, current should
3225 _not_ be rotated into the history */
3226 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3227 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3228 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3229
3230 drbd_start_resync(mdev, C_SYNC_TARGET);
3231
3232 put_ldev(mdev);
3233 } else
3234 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3235
3236 return TRUE;
3237}
3238
3239enum receive_bitmap_ret { OK, DONE, FAILED };
3240
3241static enum receive_bitmap_ret
3242receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3243 unsigned long *buffer, struct bm_xfer_ctx *c)
3244{
3245 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3246 unsigned want = num_words * sizeof(long);
3247
3248 if (want != h->length) {
3249 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3250 return FAILED;
3251 }
3252 if (want == 0)
3253 return DONE;
3254 if (drbd_recv(mdev, buffer, want) != want)
3255 return FAILED;
3256
3257 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3258
3259 c->word_offset += num_words;
3260 c->bit_offset = c->word_offset * BITS_PER_LONG;
3261 if (c->bit_offset > c->bm_bits)
3262 c->bit_offset = c->bm_bits;
3263
3264 return OK;
3265}
3266
3267static enum receive_bitmap_ret
3268recv_bm_rle_bits(struct drbd_conf *mdev,
3269 struct p_compressed_bm *p,
3270 struct bm_xfer_ctx *c)
3271{
3272 struct bitstream bs;
3273 u64 look_ahead;
3274 u64 rl;
3275 u64 tmp;
3276 unsigned long s = c->bit_offset;
3277 unsigned long e;
3278 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3279 int toggle = DCBP_get_start(p);
3280 int have;
3281 int bits;
3282
3283 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3284
3285 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3286 if (bits < 0)
3287 return FAILED;
3288
3289 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3290 bits = vli_decode_bits(&rl, look_ahead);
3291 if (bits <= 0)
3292 return FAILED;
3293
3294 if (toggle) {
3295 e = s + rl -1;
3296 if (e >= c->bm_bits) {
3297 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3298 return FAILED;
3299 }
3300 _drbd_bm_set_bits(mdev, s, e);
3301 }
3302
3303 if (have < bits) {
3304 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3305 have, bits, look_ahead,
3306 (unsigned int)(bs.cur.b - p->code),
3307 (unsigned int)bs.buf_len);
3308 return FAILED;
3309 }
3310 look_ahead >>= bits;
3311 have -= bits;
3312
3313 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3314 if (bits < 0)
3315 return FAILED;
3316 look_ahead |= tmp << have;
3317 have += bits;
3318 }
3319
3320 c->bit_offset = s;
3321 bm_xfer_ctx_bit_to_word_offset(c);
3322
3323 return (s == c->bm_bits) ? DONE : OK;
3324}
3325
3326static enum receive_bitmap_ret
3327decode_bitmap_c(struct drbd_conf *mdev,
3328 struct p_compressed_bm *p,
3329 struct bm_xfer_ctx *c)
3330{
3331 if (DCBP_get_code(p) == RLE_VLI_Bits)
3332 return recv_bm_rle_bits(mdev, p, c);
3333
3334 /* other variants had been implemented for evaluation,
3335 * but have been dropped as this one turned out to be "best"
3336 * during all our tests. */
3337
3338 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3339 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3340 return FAILED;
3341}
3342
3343void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3344 const char *direction, struct bm_xfer_ctx *c)
3345{
3346 /* what would it take to transfer it "plaintext" */
3347 unsigned plain = sizeof(struct p_header) *
3348 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3349 + c->bm_words * sizeof(long);
3350 unsigned total = c->bytes[0] + c->bytes[1];
3351 unsigned r;
3352
3353 /* total can not be zero. but just in case: */
3354 if (total == 0)
3355 return;
3356
3357 /* don't report if not compressed */
3358 if (total >= plain)
3359 return;
3360
3361 /* total < plain. check for overflow, still */
3362 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3363 : (1000 * total / plain);
3364
3365 if (r > 1000)
3366 r = 1000;
3367
3368 r = 1000 - r;
3369 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3370 "total %u; compression: %u.%u%%\n",
3371 direction,
3372 c->bytes[1], c->packets[1],
3373 c->bytes[0], c->packets[0],
3374 total, r/10, r % 10);
3375}
3376
3377/* Since we are processing the bitfield from lower addresses to higher,
3378 it does not matter if the process it in 32 bit chunks or 64 bit
3379 chunks as long as it is little endian. (Understand it as byte stream,
3380 beginning with the lowest byte...) If we would use big endian
3381 we would need to process it from the highest address to the lowest,
3382 in order to be agnostic to the 32 vs 64 bits issue.
3383
3384 returns 0 on failure, 1 if we successfully received it. */
3385static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3386{
3387 struct bm_xfer_ctx c;
3388 void *buffer;
3389 enum receive_bitmap_ret ret;
3390 int ok = FALSE;
3391
3392 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3393
3394 drbd_bm_lock(mdev, "receive bitmap");
3395
3396 /* maybe we should use some per thread scratch page,
3397 * and allocate that during initial device creation? */
3398 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3399 if (!buffer) {
3400 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3401 goto out;
3402 }
3403
3404 c = (struct bm_xfer_ctx) {
3405 .bm_bits = drbd_bm_bits(mdev),
3406 .bm_words = drbd_bm_words(mdev),
3407 };
3408
3409 do {
3410 if (h->command == P_BITMAP) {
3411 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3412 } else if (h->command == P_COMPRESSED_BITMAP) {
3413 /* MAYBE: sanity check that we speak proto >= 90,
3414 * and the feature is enabled! */
3415 struct p_compressed_bm *p;
3416
3417 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3418 dev_err(DEV, "ReportCBitmap packet too large\n");
3419 goto out;
3420 }
3421 /* use the page buff */
3422 p = buffer;
3423 memcpy(p, h, sizeof(*h));
3424 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3425 goto out;
3426 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3427 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3428 return FAILED;
3429 }
3430 ret = decode_bitmap_c(mdev, p, &c);
3431 } else {
3432 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3433 goto out;
3434 }
3435
3436 c.packets[h->command == P_BITMAP]++;
3437 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3438
3439 if (ret != OK)
3440 break;
3441
3442 if (!drbd_recv_header(mdev, h))
3443 goto out;
3444 } while (ret == OK);
3445 if (ret == FAILED)
3446 goto out;
3447
3448 INFO_bm_xfer_stats(mdev, "receive", &c);
3449
3450 if (mdev->state.conn == C_WF_BITMAP_T) {
3451 ok = !drbd_send_bitmap(mdev);
3452 if (!ok)
3453 goto out;
3454 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3455 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3456 D_ASSERT(ok == SS_SUCCESS);
3457 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3458 /* admin may have requested C_DISCONNECTING,
3459 * other threads may have noticed network errors */
3460 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3461 drbd_conn_str(mdev->state.conn));
3462 }
3463
3464 ok = TRUE;
3465 out:
3466 drbd_bm_unlock(mdev);
3467 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3468 drbd_start_resync(mdev, C_SYNC_SOURCE);
3469 free_page((unsigned long) buffer);
3470 return ok;
3471}
3472
3473static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3474{
3475 /* TODO zero copy sink :) */
3476 static char sink[128];
3477 int size, want, r;
3478
3479 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3480 h->command, h->length);
3481
3482 size = h->length;
3483 while (size > 0) {
3484 want = min_t(int, size, sizeof(sink));
3485 r = drbd_recv(mdev, sink, want);
3486 ERR_IF(r <= 0) break;
3487 size -= r;
3488 }
3489 return size == 0;
3490}
3491
3492static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3493{
3494 if (mdev->state.disk >= D_INCONSISTENT)
3495 drbd_kick_lo(mdev);
3496
3497 /* Make sure we've acked all the TCP data associated
3498 * with the data requests being unplugged */
3499 drbd_tcp_quickack(mdev->data.socket);
3500
3501 return TRUE;
3502}
3503
0ced55a3
PR
3504static void timeval_sub_us(struct timeval* tv, unsigned int us)
3505{
3506 tv->tv_sec -= us / 1000000;
3507 us = us % 1000000;
3508 if (tv->tv_usec > us) {
3509 tv->tv_usec += 1000000;
3510 tv->tv_sec--;
3511 }
3512 tv->tv_usec -= us;
3513}
3514
3515static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3516{
3517 struct delay_probe *dp;
3518 struct list_head *le;
3519 struct timeval now;
3520 int seq_num;
3521 int offset;
3522 int data_delay;
3523
3524 seq_num = be32_to_cpu(p->seq_num);
3525 offset = be32_to_cpu(p->offset);
3526
3527 spin_lock(&mdev->peer_seq_lock);
3528 if (!list_empty(&mdev->delay_probes)) {
3529 if (from == USE_DATA_SOCKET)
3530 le = mdev->delay_probes.next;
3531 else
3532 le = mdev->delay_probes.prev;
3533
3534 dp = list_entry(le, struct delay_probe, list);
3535
3536 if (dp->seq_num == seq_num) {
3537 list_del(le);
3538 spin_unlock(&mdev->peer_seq_lock);
3539 do_gettimeofday(&now);
3540 timeval_sub_us(&now, offset);
3541 data_delay =
3542 now.tv_usec - dp->time.tv_usec +
3543 (now.tv_sec - dp->time.tv_sec) * 1000000;
3544
3545 if (data_delay > 0)
3546 mdev->data_delay = data_delay;
3547
3548 kfree(dp);
3549 return;
3550 }
3551
3552 if (dp->seq_num > seq_num) {
3553 spin_unlock(&mdev->peer_seq_lock);
3554 dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3555 return; /* Do not alloca a struct delay_probe.... */
3556 }
3557 }
3558 spin_unlock(&mdev->peer_seq_lock);
3559
3560 dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3561 if (!dp) {
3562 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3563 return;
3564 }
3565
3566 dp->seq_num = seq_num;
3567 do_gettimeofday(&dp->time);
3568 timeval_sub_us(&dp->time, offset);
3569
3570 spin_lock(&mdev->peer_seq_lock);
3571 if (from == USE_DATA_SOCKET)
3572 list_add(&dp->list, &mdev->delay_probes);
3573 else
3574 list_add_tail(&dp->list, &mdev->delay_probes);
3575 spin_unlock(&mdev->peer_seq_lock);
3576}
3577
3578static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3579{
3580 struct p_delay_probe *p = (struct p_delay_probe *)h;
3581
3582 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3583 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3584 return FALSE;
3585
3586 got_delay_probe(mdev, USE_DATA_SOCKET, p);
3587 return TRUE;
3588}
3589
b411b363
PR
3590typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3591
3592static drbd_cmd_handler_f drbd_default_handler[] = {
3593 [P_DATA] = receive_Data,
3594 [P_DATA_REPLY] = receive_DataReply,
3595 [P_RS_DATA_REPLY] = receive_RSDataReply,
3596 [P_BARRIER] = receive_Barrier,
3597 [P_BITMAP] = receive_bitmap,
3598 [P_COMPRESSED_BITMAP] = receive_bitmap,
3599 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3600 [P_DATA_REQUEST] = receive_DataRequest,
3601 [P_RS_DATA_REQUEST] = receive_DataRequest,
3602 [P_SYNC_PARAM] = receive_SyncParam,
3603 [P_SYNC_PARAM89] = receive_SyncParam,
3604 [P_PROTOCOL] = receive_protocol,
3605 [P_UUIDS] = receive_uuids,
3606 [P_SIZES] = receive_sizes,
3607 [P_STATE] = receive_state,
3608 [P_STATE_CHG_REQ] = receive_req_state,
3609 [P_SYNC_UUID] = receive_sync_uuid,
3610 [P_OV_REQUEST] = receive_DataRequest,
3611 [P_OV_REPLY] = receive_DataRequest,
3612 [P_CSUM_RS_REQUEST] = receive_DataRequest,
0ced55a3 3613 [P_DELAY_PROBE] = receive_delay_probe,
b411b363
PR
3614 /* anything missing from this table is in
3615 * the asender_tbl, see get_asender_cmd */
3616 [P_MAX_CMD] = NULL,
3617};
3618
3619static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3620static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3621
3622static void drbdd(struct drbd_conf *mdev)
3623{
3624 drbd_cmd_handler_f handler;
3625 struct p_header *header = &mdev->data.rbuf.header;
3626
3627 while (get_t_state(&mdev->receiver) == Running) {
3628 drbd_thread_current_set_cpu(mdev);
0b33a916
LE
3629 if (!drbd_recv_header(mdev, header)) {
3630 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
b411b363 3631 break;
0b33a916 3632 }
b411b363
PR
3633
3634 if (header->command < P_MAX_CMD)
3635 handler = drbd_cmd_handler[header->command];
3636 else if (P_MAY_IGNORE < header->command
3637 && header->command < P_MAX_OPT_CMD)
3638 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3639 else if (header->command > P_MAX_OPT_CMD)
3640 handler = receive_skip;
3641 else
3642 handler = NULL;
3643
3644 if (unlikely(!handler)) {
3645 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3646 header->command, header->length);
3647 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3648 break;
3649 }
3650 if (unlikely(!handler(mdev, header))) {
3651 dev_err(DEV, "error receiving %s, l: %d!\n",
3652 cmdname(header->command), header->length);
3653 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3654 break;
3655 }
b411b363
PR
3656 }
3657}
3658
3659static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3660{
3661 struct hlist_head *slot;
3662 struct hlist_node *pos;
3663 struct hlist_node *tmp;
3664 struct drbd_request *req;
3665 int i;
3666
3667 /*
3668 * Application READ requests
3669 */
3670 spin_lock_irq(&mdev->req_lock);
3671 for (i = 0; i < APP_R_HSIZE; i++) {
3672 slot = mdev->app_reads_hash+i;
3673 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3674 /* it may (but should not any longer!)
3675 * be on the work queue; if that assert triggers,
3676 * we need to also grab the
3677 * spin_lock_irq(&mdev->data.work.q_lock);
3678 * and list_del_init here. */
3679 D_ASSERT(list_empty(&req->w.list));
3680 /* It would be nice to complete outside of spinlock.
3681 * But this is easier for now. */
3682 _req_mod(req, connection_lost_while_pending);
3683 }
3684 }
3685 for (i = 0; i < APP_R_HSIZE; i++)
3686 if (!hlist_empty(mdev->app_reads_hash+i))
3687 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3688 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3689
3690 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3691 spin_unlock_irq(&mdev->req_lock);
3692}
3693
3694void drbd_flush_workqueue(struct drbd_conf *mdev)
3695{
3696 struct drbd_wq_barrier barr;
3697
3698 barr.w.cb = w_prev_work_done;
3699 init_completion(&barr.done);
3700 drbd_queue_work(&mdev->data.work, &barr.w);
3701 wait_for_completion(&barr.done);
3702}
3703
3704static void drbd_disconnect(struct drbd_conf *mdev)
3705{
3706 enum drbd_fencing_p fp;
3707 union drbd_state os, ns;
3708 int rv = SS_UNKNOWN_ERROR;
3709 unsigned int i;
3710
3711 if (mdev->state.conn == C_STANDALONE)
3712 return;
3713 if (mdev->state.conn >= C_WF_CONNECTION)
3714 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3715 drbd_conn_str(mdev->state.conn));
3716
3717 /* asender does not clean up anything. it must not interfere, either */
3718 drbd_thread_stop(&mdev->asender);
b411b363 3719 drbd_free_sock(mdev);
b411b363
PR
3720
3721 spin_lock_irq(&mdev->req_lock);
3722 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3723 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3724 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3725 spin_unlock_irq(&mdev->req_lock);
3726
3727 /* We do not have data structures that would allow us to
3728 * get the rs_pending_cnt down to 0 again.
3729 * * On C_SYNC_TARGET we do not have any data structures describing
3730 * the pending RSDataRequest's we have sent.
3731 * * On C_SYNC_SOURCE there is no data structure that tracks
3732 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3733 * And no, it is not the sum of the reference counts in the
3734 * resync_LRU. The resync_LRU tracks the whole operation including
3735 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3736 * on the fly. */
3737 drbd_rs_cancel_all(mdev);
3738 mdev->rs_total = 0;
3739 mdev->rs_failed = 0;
3740 atomic_set(&mdev->rs_pending_cnt, 0);
3741 wake_up(&mdev->misc_wait);
3742
3743 /* make sure syncer is stopped and w_resume_next_sg queued */
3744 del_timer_sync(&mdev->resync_timer);
3745 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3746 resync_timer_fn((unsigned long)mdev);
3747
b411b363
PR
3748 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3749 * w_make_resync_request etc. which may still be on the worker queue
3750 * to be "canceled" */
3751 drbd_flush_workqueue(mdev);
3752
3753 /* This also does reclaim_net_ee(). If we do this too early, we might
3754 * miss some resync ee and pages.*/
3755 drbd_process_done_ee(mdev);
3756
3757 kfree(mdev->p_uuid);
3758 mdev->p_uuid = NULL;
3759
3760 if (!mdev->state.susp)
3761 tl_clear(mdev);
3762
3763 drbd_fail_pending_reads(mdev);
3764
3765 dev_info(DEV, "Connection closed\n");
3766
3767 drbd_md_sync(mdev);
3768
3769 fp = FP_DONT_CARE;
3770 if (get_ldev(mdev)) {
3771 fp = mdev->ldev->dc.fencing;
3772 put_ldev(mdev);
3773 }
3774
3775 if (mdev->state.role == R_PRIMARY) {
3776 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3777 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3778 drbd_request_state(mdev, NS(pdsk, nps));
3779 }
3780 }
3781
3782 spin_lock_irq(&mdev->req_lock);
3783 os = mdev->state;
3784 if (os.conn >= C_UNCONNECTED) {
3785 /* Do not restart in case we are C_DISCONNECTING */
3786 ns = os;
3787 ns.conn = C_UNCONNECTED;
3788 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3789 }
3790 spin_unlock_irq(&mdev->req_lock);
3791
3792 if (os.conn == C_DISCONNECTING) {
3793 struct hlist_head *h;
3794 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3795
3796 /* we must not free the tl_hash
3797 * while application io is still on the fly */
3798 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3799
3800 spin_lock_irq(&mdev->req_lock);
3801 /* paranoia code */
3802 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3803 if (h->first)
3804 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3805 (int)(h - mdev->ee_hash), h->first);
3806 kfree(mdev->ee_hash);
3807 mdev->ee_hash = NULL;
3808 mdev->ee_hash_s = 0;
3809
3810 /* paranoia code */
3811 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3812 if (h->first)
3813 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3814 (int)(h - mdev->tl_hash), h->first);
3815 kfree(mdev->tl_hash);
3816 mdev->tl_hash = NULL;
3817 mdev->tl_hash_s = 0;
3818 spin_unlock_irq(&mdev->req_lock);
3819
3820 crypto_free_hash(mdev->cram_hmac_tfm);
3821 mdev->cram_hmac_tfm = NULL;
3822
3823 kfree(mdev->net_conf);
3824 mdev->net_conf = NULL;
3825 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3826 }
3827
3828 /* tcp_close and release of sendpage pages can be deferred. I don't
3829 * want to use SO_LINGER, because apparently it can be deferred for
3830 * more than 20 seconds (longest time I checked).
3831 *
3832 * Actually we don't care for exactly when the network stack does its
3833 * put_page(), but release our reference on these pages right here.
3834 */
3835 i = drbd_release_ee(mdev, &mdev->net_ee);
3836 if (i)
3837 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3838 i = atomic_read(&mdev->pp_in_use);
3839 if (i)
3840 dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3841
3842 D_ASSERT(list_empty(&mdev->read_ee));
3843 D_ASSERT(list_empty(&mdev->active_ee));
3844 D_ASSERT(list_empty(&mdev->sync_ee));
3845 D_ASSERT(list_empty(&mdev->done_ee));
3846
3847 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3848 atomic_set(&mdev->current_epoch->epoch_size, 0);
3849 D_ASSERT(list_empty(&mdev->current_epoch->list));
3850}
3851
3852/*
3853 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3854 * we can agree on is stored in agreed_pro_version.
3855 *
3856 * feature flags and the reserved array should be enough room for future
3857 * enhancements of the handshake protocol, and possible plugins...
3858 *
3859 * for now, they are expected to be zero, but ignored.
3860 */
3861static int drbd_send_handshake(struct drbd_conf *mdev)
3862{
3863 /* ASSERT current == mdev->receiver ... */
3864 struct p_handshake *p = &mdev->data.sbuf.handshake;
3865 int ok;
3866
3867 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3868 dev_err(DEV, "interrupted during initial handshake\n");
3869 return 0; /* interrupted. not ok. */
3870 }
3871
3872 if (mdev->data.socket == NULL) {
3873 mutex_unlock(&mdev->data.mutex);
3874 return 0;
3875 }
3876
3877 memset(p, 0, sizeof(*p));
3878 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3879 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3880 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3881 (struct p_header *)p, sizeof(*p), 0 );
3882 mutex_unlock(&mdev->data.mutex);
3883 return ok;
3884}
3885
3886/*
3887 * return values:
3888 * 1 yes, we have a valid connection
3889 * 0 oops, did not work out, please try again
3890 * -1 peer talks different language,
3891 * no point in trying again, please go standalone.
3892 */
3893static int drbd_do_handshake(struct drbd_conf *mdev)
3894{
3895 /* ASSERT current == mdev->receiver ... */
3896 struct p_handshake *p = &mdev->data.rbuf.handshake;
3897 const int expect = sizeof(struct p_handshake)
3898 -sizeof(struct p_header);
3899 int rv;
3900
3901 rv = drbd_send_handshake(mdev);
3902 if (!rv)
3903 return 0;
3904
3905 rv = drbd_recv_header(mdev, &p->head);
3906 if (!rv)
3907 return 0;
3908
3909 if (p->head.command != P_HAND_SHAKE) {
3910 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3911 cmdname(p->head.command), p->head.command);
3912 return -1;
3913 }
3914
3915 if (p->head.length != expect) {
3916 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3917 expect, p->head.length);
3918 return -1;
3919 }
3920
3921 rv = drbd_recv(mdev, &p->head.payload, expect);
3922
3923 if (rv != expect) {
3924 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3925 return 0;
3926 }
3927
b411b363
PR
3928 p->protocol_min = be32_to_cpu(p->protocol_min);
3929 p->protocol_max = be32_to_cpu(p->protocol_max);
3930 if (p->protocol_max == 0)
3931 p->protocol_max = p->protocol_min;
3932
3933 if (PRO_VERSION_MAX < p->protocol_min ||
3934 PRO_VERSION_MIN > p->protocol_max)
3935 goto incompat;
3936
3937 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3938
3939 dev_info(DEV, "Handshake successful: "
3940 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3941
3942 return 1;
3943
3944 incompat:
3945 dev_err(DEV, "incompatible DRBD dialects: "
3946 "I support %d-%d, peer supports %d-%d\n",
3947 PRO_VERSION_MIN, PRO_VERSION_MAX,
3948 p->protocol_min, p->protocol_max);
3949 return -1;
3950}
3951
3952#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3953static int drbd_do_auth(struct drbd_conf *mdev)
3954{
3955 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3956 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
b10d96cb 3957 return -1;
b411b363
PR
3958}
3959#else
3960#define CHALLENGE_LEN 64
b10d96cb
JT
3961
3962/* Return value:
3963 1 - auth succeeded,
3964 0 - failed, try again (network error),
3965 -1 - auth failed, don't try again.
3966*/
3967
b411b363
PR
3968static int drbd_do_auth(struct drbd_conf *mdev)
3969{
3970 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
3971 struct scatterlist sg;
3972 char *response = NULL;
3973 char *right_response = NULL;
3974 char *peers_ch = NULL;
3975 struct p_header p;
3976 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3977 unsigned int resp_size;
3978 struct hash_desc desc;
3979 int rv;
3980
3981 desc.tfm = mdev->cram_hmac_tfm;
3982 desc.flags = 0;
3983
3984 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3985 (u8 *)mdev->net_conf->shared_secret, key_len);
3986 if (rv) {
3987 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
b10d96cb 3988 rv = -1;
b411b363
PR
3989 goto fail;
3990 }
3991
3992 get_random_bytes(my_challenge, CHALLENGE_LEN);
3993
3994 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3995 if (!rv)
3996 goto fail;
3997
3998 rv = drbd_recv_header(mdev, &p);
3999 if (!rv)
4000 goto fail;
4001
4002 if (p.command != P_AUTH_CHALLENGE) {
4003 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4004 cmdname(p.command), p.command);
4005 rv = 0;
4006 goto fail;
4007 }
4008
4009 if (p.length > CHALLENGE_LEN*2) {
4010 dev_err(DEV, "expected AuthChallenge payload too big.\n");
b10d96cb 4011 rv = -1;
b411b363
PR
4012 goto fail;
4013 }
4014
4015 peers_ch = kmalloc(p.length, GFP_NOIO);
4016 if (peers_ch == NULL) {
4017 dev_err(DEV, "kmalloc of peers_ch failed\n");
b10d96cb 4018 rv = -1;
b411b363
PR
4019 goto fail;
4020 }
4021
4022 rv = drbd_recv(mdev, peers_ch, p.length);
4023
4024 if (rv != p.length) {
4025 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4026 rv = 0;
4027 goto fail;
4028 }
4029
4030 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4031 response = kmalloc(resp_size, GFP_NOIO);
4032 if (response == NULL) {
4033 dev_err(DEV, "kmalloc of response failed\n");
b10d96cb 4034 rv = -1;
b411b363
PR
4035 goto fail;
4036 }
4037
4038 sg_init_table(&sg, 1);
4039 sg_set_buf(&sg, peers_ch, p.length);
4040
4041 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4042 if (rv) {
4043 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4044 rv = -1;
b411b363
PR
4045 goto fail;
4046 }
4047
4048 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4049 if (!rv)
4050 goto fail;
4051
4052 rv = drbd_recv_header(mdev, &p);
4053 if (!rv)
4054 goto fail;
4055
4056 if (p.command != P_AUTH_RESPONSE) {
4057 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4058 cmdname(p.command), p.command);
4059 rv = 0;
4060 goto fail;
4061 }
4062
4063 if (p.length != resp_size) {
4064 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4065 rv = 0;
4066 goto fail;
4067 }
4068
4069 rv = drbd_recv(mdev, response , resp_size);
4070
4071 if (rv != resp_size) {
4072 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4073 rv = 0;
4074 goto fail;
4075 }
4076
4077 right_response = kmalloc(resp_size, GFP_NOIO);
2d1ee87d 4078 if (right_response == NULL) {
b411b363 4079 dev_err(DEV, "kmalloc of right_response failed\n");
b10d96cb 4080 rv = -1;
b411b363
PR
4081 goto fail;
4082 }
4083
4084 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4085
4086 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4087 if (rv) {
4088 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
b10d96cb 4089 rv = -1;
b411b363
PR
4090 goto fail;
4091 }
4092
4093 rv = !memcmp(response, right_response, resp_size);
4094
4095 if (rv)
4096 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4097 resp_size, mdev->net_conf->cram_hmac_alg);
b10d96cb
JT
4098 else
4099 rv = -1;
b411b363
PR
4100
4101 fail:
4102 kfree(peers_ch);
4103 kfree(response);
4104 kfree(right_response);
4105
4106 return rv;
4107}
4108#endif
4109
4110int drbdd_init(struct drbd_thread *thi)
4111{
4112 struct drbd_conf *mdev = thi->mdev;
4113 unsigned int minor = mdev_to_minor(mdev);
4114 int h;
4115
4116 sprintf(current->comm, "drbd%d_receiver", minor);
4117
4118 dev_info(DEV, "receiver (re)started\n");
4119
4120 do {
4121 h = drbd_connect(mdev);
4122 if (h == 0) {
4123 drbd_disconnect(mdev);
4124 __set_current_state(TASK_INTERRUPTIBLE);
4125 schedule_timeout(HZ);
4126 }
4127 if (h == -1) {
4128 dev_warn(DEV, "Discarding network configuration.\n");
4129 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4130 }
4131 } while (h == 0);
4132
4133 if (h > 0) {
4134 if (get_net_conf(mdev)) {
4135 drbdd(mdev);
4136 put_net_conf(mdev);
4137 }
4138 }
4139
4140 drbd_disconnect(mdev);
4141
4142 dev_info(DEV, "receiver terminated\n");
4143 return 0;
4144}
4145
4146/* ********* acknowledge sender ******** */
4147
4148static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4149{
4150 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4151
4152 int retcode = be32_to_cpu(p->retcode);
4153
4154 if (retcode >= SS_SUCCESS) {
4155 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4156 } else {
4157 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4158 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4159 drbd_set_st_err_str(retcode), retcode);
4160 }
4161 wake_up(&mdev->state_wait);
4162
4163 return TRUE;
4164}
4165
4166static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4167{
4168 return drbd_send_ping_ack(mdev);
4169
4170}
4171
4172static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4173{
4174 /* restore idle timeout */
4175 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
309d1608
PR
4176 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4177 wake_up(&mdev->misc_wait);
b411b363
PR
4178
4179 return TRUE;
4180}
4181
4182static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4183{
4184 struct p_block_ack *p = (struct p_block_ack *)h;
4185 sector_t sector = be64_to_cpu(p->sector);
4186 int blksize = be32_to_cpu(p->blksize);
4187
4188 D_ASSERT(mdev->agreed_pro_version >= 89);
4189
4190 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4191
4192 drbd_rs_complete_io(mdev, sector);
4193 drbd_set_in_sync(mdev, sector, blksize);
4194 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4195 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4196 dec_rs_pending(mdev);
4197
4198 return TRUE;
4199}
4200
4201/* when we receive the ACK for a write request,
4202 * verify that we actually know about it */
4203static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4204 u64 id, sector_t sector)
4205{
4206 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4207 struct hlist_node *n;
4208 struct drbd_request *req;
4209
4210 hlist_for_each_entry(req, n, slot, colision) {
4211 if ((unsigned long)req == (unsigned long)id) {
4212 if (req->sector != sector) {
4213 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4214 "wrong sector (%llus versus %llus)\n", req,
4215 (unsigned long long)req->sector,
4216 (unsigned long long)sector);
4217 break;
4218 }
4219 return req;
4220 }
4221 }
4222 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4223 (void *)(unsigned long)id, (unsigned long long)sector);
4224 return NULL;
4225}
4226
4227typedef struct drbd_request *(req_validator_fn)
4228 (struct drbd_conf *mdev, u64 id, sector_t sector);
4229
4230static int validate_req_change_req_state(struct drbd_conf *mdev,
4231 u64 id, sector_t sector, req_validator_fn validator,
4232 const char *func, enum drbd_req_event what)
4233{
4234 struct drbd_request *req;
4235 struct bio_and_error m;
4236
4237 spin_lock_irq(&mdev->req_lock);
4238 req = validator(mdev, id, sector);
4239 if (unlikely(!req)) {
4240 spin_unlock_irq(&mdev->req_lock);
4241 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4242 return FALSE;
4243 }
4244 __req_mod(req, what, &m);
4245 spin_unlock_irq(&mdev->req_lock);
4246
4247 if (m.bio)
4248 complete_master_bio(mdev, &m);
4249 return TRUE;
4250}
4251
4252static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4253{
4254 struct p_block_ack *p = (struct p_block_ack *)h;
4255 sector_t sector = be64_to_cpu(p->sector);
4256 int blksize = be32_to_cpu(p->blksize);
4257 enum drbd_req_event what;
4258
4259 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4260
4261 if (is_syncer_block_id(p->block_id)) {
4262 drbd_set_in_sync(mdev, sector, blksize);
4263 dec_rs_pending(mdev);
4264 return TRUE;
4265 }
4266 switch (be16_to_cpu(h->command)) {
4267 case P_RS_WRITE_ACK:
4268 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4269 what = write_acked_by_peer_and_sis;
4270 break;
4271 case P_WRITE_ACK:
4272 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4273 what = write_acked_by_peer;
4274 break;
4275 case P_RECV_ACK:
4276 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4277 what = recv_acked_by_peer;
4278 break;
4279 case P_DISCARD_ACK:
4280 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4281 what = conflict_discarded_by_peer;
4282 break;
4283 default:
4284 D_ASSERT(0);
4285 return FALSE;
4286 }
4287
4288 return validate_req_change_req_state(mdev, p->block_id, sector,
4289 _ack_id_to_req, __func__ , what);
4290}
4291
4292static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4293{
4294 struct p_block_ack *p = (struct p_block_ack *)h;
4295 sector_t sector = be64_to_cpu(p->sector);
4296
4297 if (__ratelimit(&drbd_ratelimit_state))
4298 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4299
4300 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4301
4302 if (is_syncer_block_id(p->block_id)) {
4303 int size = be32_to_cpu(p->blksize);
4304 dec_rs_pending(mdev);
4305 drbd_rs_failed_io(mdev, sector, size);
4306 return TRUE;
4307 }
4308 return validate_req_change_req_state(mdev, p->block_id, sector,
4309 _ack_id_to_req, __func__ , neg_acked);
4310}
4311
4312static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4313{
4314 struct p_block_ack *p = (struct p_block_ack *)h;
4315 sector_t sector = be64_to_cpu(p->sector);
4316
4317 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4318 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4319 (unsigned long long)sector, be32_to_cpu(p->blksize));
4320
4321 return validate_req_change_req_state(mdev, p->block_id, sector,
4322 _ar_id_to_req, __func__ , neg_acked);
4323}
4324
4325static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4326{
4327 sector_t sector;
4328 int size;
4329 struct p_block_ack *p = (struct p_block_ack *)h;
4330
4331 sector = be64_to_cpu(p->sector);
4332 size = be32_to_cpu(p->blksize);
b411b363
PR
4333
4334 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4335
4336 dec_rs_pending(mdev);
4337
4338 if (get_ldev_if_state(mdev, D_FAILED)) {
4339 drbd_rs_complete_io(mdev, sector);
4340 drbd_rs_failed_io(mdev, sector, size);
4341 put_ldev(mdev);
4342 }
4343
4344 return TRUE;
4345}
4346
4347static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4348{
4349 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4350
4351 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4352
4353 return TRUE;
4354}
4355
4356static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4357{
4358 struct p_block_ack *p = (struct p_block_ack *)h;
4359 struct drbd_work *w;
4360 sector_t sector;
4361 int size;
4362
4363 sector = be64_to_cpu(p->sector);
4364 size = be32_to_cpu(p->blksize);
4365
4366 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4367
4368 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4369 drbd_ov_oos_found(mdev, sector, size);
4370 else
4371 ov_oos_print(mdev);
4372
4373 drbd_rs_complete_io(mdev, sector);
4374 dec_rs_pending(mdev);
4375
4376 if (--mdev->ov_left == 0) {
4377 w = kmalloc(sizeof(*w), GFP_NOIO);
4378 if (w) {
4379 w->cb = w_ov_finished;
4380 drbd_queue_work_front(&mdev->data.work, w);
4381 } else {
4382 dev_err(DEV, "kmalloc(w) failed.");
4383 ov_oos_print(mdev);
4384 drbd_resync_finished(mdev);
4385 }
4386 }
4387 return TRUE;
4388}
4389
0ced55a3
PR
4390static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4391{
4392 struct p_delay_probe *p = (struct p_delay_probe *)h;
4393
4394 got_delay_probe(mdev, USE_META_SOCKET, p);
4395 return TRUE;
4396}
4397
b411b363
PR
4398struct asender_cmd {
4399 size_t pkt_size;
4400 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4401};
4402
4403static struct asender_cmd *get_asender_cmd(int cmd)
4404{
4405 static struct asender_cmd asender_tbl[] = {
4406 /* anything missing from this table is in
4407 * the drbd_cmd_handler (drbd_default_handler) table,
4408 * see the beginning of drbdd() */
4409 [P_PING] = { sizeof(struct p_header), got_Ping },
4410 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4411 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4412 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4413 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4414 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4415 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4416 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4417 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4418 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4419 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4420 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4421 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
0ced55a3 4422 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_delay_probe_m },
b411b363
PR
4423 [P_MAX_CMD] = { 0, NULL },
4424 };
4425 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4426 return NULL;
4427 return &asender_tbl[cmd];
4428}
4429
4430int drbd_asender(struct drbd_thread *thi)
4431{
4432 struct drbd_conf *mdev = thi->mdev;
4433 struct p_header *h = &mdev->meta.rbuf.header;
4434 struct asender_cmd *cmd = NULL;
4435
4436 int rv, len;
4437 void *buf = h;
4438 int received = 0;
4439 int expect = sizeof(struct p_header);
4440 int empty;
4441
4442 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4443
4444 current->policy = SCHED_RR; /* Make this a realtime task! */
4445 current->rt_priority = 2; /* more important than all other tasks */
4446
4447 while (get_t_state(thi) == Running) {
4448 drbd_thread_current_set_cpu(mdev);
4449 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4450 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4451 mdev->meta.socket->sk->sk_rcvtimeo =
4452 mdev->net_conf->ping_timeo*HZ/10;
4453 }
4454
4455 /* conditionally cork;
4456 * it may hurt latency if we cork without much to send */
4457 if (!mdev->net_conf->no_cork &&
4458 3 < atomic_read(&mdev->unacked_cnt))
4459 drbd_tcp_cork(mdev->meta.socket);
4460 while (1) {
4461 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4462 flush_signals(current);
4463 if (!drbd_process_done_ee(mdev)) {
4464 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4465 goto reconnect;
4466 }
4467 /* to avoid race with newly queued ACKs */
4468 set_bit(SIGNAL_ASENDER, &mdev->flags);
4469 spin_lock_irq(&mdev->req_lock);
4470 empty = list_empty(&mdev->done_ee);
4471 spin_unlock_irq(&mdev->req_lock);
4472 /* new ack may have been queued right here,
4473 * but then there is also a signal pending,
4474 * and we start over... */
4475 if (empty)
4476 break;
4477 }
4478 /* but unconditionally uncork unless disabled */
4479 if (!mdev->net_conf->no_cork)
4480 drbd_tcp_uncork(mdev->meta.socket);
4481
4482 /* short circuit, recv_msg would return EINTR anyways. */
4483 if (signal_pending(current))
4484 continue;
4485
4486 rv = drbd_recv_short(mdev, mdev->meta.socket,
4487 buf, expect-received, 0);
4488 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4489
4490 flush_signals(current);
4491
4492 /* Note:
4493 * -EINTR (on meta) we got a signal
4494 * -EAGAIN (on meta) rcvtimeo expired
4495 * -ECONNRESET other side closed the connection
4496 * -ERESTARTSYS (on data) we got a signal
4497 * rv < 0 other than above: unexpected error!
4498 * rv == expected: full header or command
4499 * rv < expected: "woken" by signal during receive
4500 * rv == 0 : "connection shut down by peer"
4501 */
4502 if (likely(rv > 0)) {
4503 received += rv;
4504 buf += rv;
4505 } else if (rv == 0) {
4506 dev_err(DEV, "meta connection shut down by peer.\n");
4507 goto reconnect;
4508 } else if (rv == -EAGAIN) {
4509 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4510 mdev->net_conf->ping_timeo*HZ/10) {
4511 dev_err(DEV, "PingAck did not arrive in time.\n");
4512 goto reconnect;
4513 }
4514 set_bit(SEND_PING, &mdev->flags);
4515 continue;
4516 } else if (rv == -EINTR) {
4517 continue;
4518 } else {
4519 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4520 goto reconnect;
4521 }
4522
4523 if (received == expect && cmd == NULL) {
4524 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4525 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4526 (long)be32_to_cpu(h->magic),
4527 h->command, h->length);
4528 goto reconnect;
4529 }
4530 cmd = get_asender_cmd(be16_to_cpu(h->command));
4531 len = be16_to_cpu(h->length);
4532 if (unlikely(cmd == NULL)) {
4533 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4534 (long)be32_to_cpu(h->magic),
4535 h->command, h->length);
4536 goto disconnect;
4537 }
4538 expect = cmd->pkt_size;
6a0afdf5 4539 ERR_IF(len != expect-sizeof(struct p_header))
b411b363 4540 goto reconnect;
b411b363
PR
4541 }
4542 if (received == expect) {
4543 D_ASSERT(cmd != NULL);
b411b363
PR
4544 if (!cmd->process(mdev, h))
4545 goto reconnect;
4546
4547 buf = h;
4548 received = 0;
4549 expect = sizeof(struct p_header);
4550 cmd = NULL;
4551 }
4552 }
4553
4554 if (0) {
4555reconnect:
4556 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4557 }
4558 if (0) {
4559disconnect:
4560 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4561 }
4562 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4563
4564 D_ASSERT(mdev->state.conn < C_CONNECTED);
4565 dev_info(DEV, "asender terminated\n");
4566
4567 return 0;
4568}