]> bbs.cooldavid.org Git - net-next-2.6.git/blob - net/tipc/link.c
Net: ceph: Makefile: remove deprecated kbuild goal definitions
[net-next-2.6.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, Ericsson AB
5  * Copyright (c) 2004-2007, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "dbg.h"
39 #include "link.h"
40 #include "net.h"
41 #include "node.h"
42 #include "port.h"
43 #include "addr.h"
44 #include "node_subscr.h"
45 #include "name_distr.h"
46 #include "bearer.h"
47 #include "name_table.h"
48 #include "discover.h"
49 #include "config.h"
50 #include "bcast.h"
51
52
53 /*
54  * Out-of-range value for link session numbers
55  */
56
57 #define INVALID_SESSION 0x10000
58
59 /*
60  * Limit for deferred reception queue:
61  */
62
63 #define DEF_QUEUE_LIMIT 256u
64
65 /*
66  * Link state events:
67  */
68
69 #define  STARTING_EVT    856384768      /* link processing trigger */
70 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
71 #define  TIMEOUT_EVT     560817u        /* link timer expired */
72
73 /*
74  * The following two 'message types' is really just implementation
75  * data conveniently stored in the message header.
76  * They must not be considered part of the protocol
77  */
78 #define OPEN_MSG   0
79 #define CLOSED_MSG 1
80
81 /*
82  * State value stored in 'exp_msg_count'
83  */
84
85 #define START_CHANGEOVER 100000u
86
87 /**
88  * struct link_name - deconstructed link name
89  * @addr_local: network address of node at this end
90  * @if_local: name of interface at this end
91  * @addr_peer: network address of node at far end
92  * @if_peer: name of interface at far end
93  */
94
95 struct link_name {
96         u32 addr_local;
97         char if_local[TIPC_MAX_IF_NAME];
98         u32 addr_peer;
99         char if_peer[TIPC_MAX_IF_NAME];
100 };
101
102 static void link_handle_out_of_seq_msg(struct link *l_ptr,
103                                        struct sk_buff *buf);
104 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
105 static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
106 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
107 static int  link_send_sections_long(struct port *sender,
108                                     struct iovec const *msg_sect,
109                                     u32 num_sect, u32 destnode);
110 static void link_check_defragm_bufs(struct link *l_ptr);
111 static void link_state_event(struct link *l_ptr, u32 event);
112 static void link_reset_statistics(struct link *l_ptr);
113 static void link_print(struct link *l_ptr, struct print_buf *buf,
114                        const char *str);
115 static void link_start(struct link *l_ptr);
116 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf);
117
118
119 /*
120  * Debugging code used by link routines only
121  *
122  * When debugging link problems on a system that has multiple links,
123  * the standard TIPC debugging routines may not be useful since they
124  * allow the output from multiple links to be intermixed.  For this reason
125  * routines of the form "dbg_link_XXX()" have been created that will capture
126  * debug info into a link's personal print buffer, which can then be dumped
127  * into the TIPC system log (TIPC_LOG) upon request.
128  *
129  * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
130  * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
131  * the dbg_link_XXX() routines simply send their output to the standard
132  * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
133  * when there is only a single link in the system being debugged.
134  *
135  * Notes:
136  * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
137  * - "l_ptr" must be valid when using dbg_link_XXX() macros
138  */
139
140 #define LINK_LOG_BUF_SIZE 0
141
142 #define dbg_link(fmt, arg...) \
143         do { \
144                 if (LINK_LOG_BUF_SIZE) \
145                         tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
146         } while (0)
147 #define dbg_link_msg(msg, txt) \
148         do { \
149                 if (LINK_LOG_BUF_SIZE) \
150                         tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
151         } while (0)
152 #define dbg_link_state(txt) \
153         do { \
154                 if (LINK_LOG_BUF_SIZE) \
155                         link_print(l_ptr, &l_ptr->print_buf, txt); \
156         } while (0)
157 #define dbg_link_dump() do { \
158         if (LINK_LOG_BUF_SIZE) { \
159                 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
160                 tipc_printbuf_move(LOG, &l_ptr->print_buf); \
161         } \
162 } while (0)
163
164 static void dbg_print_link(struct link *l_ptr, const char *str)
165 {
166         if (DBG_OUTPUT != TIPC_NULL)
167                 link_print(l_ptr, DBG_OUTPUT, str);
168 }
169
170 static void dbg_print_buf_chain(struct sk_buff *root_buf)
171 {
172         if (DBG_OUTPUT != TIPC_NULL) {
173                 struct sk_buff *buf = root_buf;
174
175                 while (buf) {
176                         msg_dbg(buf_msg(buf), "In chain: ");
177                         buf = buf->next;
178                 }
179         }
180 }
181
182 /*
183  *  Simple link routines
184  */
185
186 static unsigned int align(unsigned int i)
187 {
188         return (i + 3) & ~3u;
189 }
190
191 static void link_init_max_pkt(struct link *l_ptr)
192 {
193         u32 max_pkt;
194
195         max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
196         if (max_pkt > MAX_MSG_SIZE)
197                 max_pkt = MAX_MSG_SIZE;
198
199         l_ptr->max_pkt_target = max_pkt;
200         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
201                 l_ptr->max_pkt = l_ptr->max_pkt_target;
202         else
203                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
204
205         l_ptr->max_pkt_probes = 0;
206 }
207
208 static u32 link_next_sent(struct link *l_ptr)
209 {
210         if (l_ptr->next_out)
211                 return msg_seqno(buf_msg(l_ptr->next_out));
212         return mod(l_ptr->next_out_no);
213 }
214
215 static u32 link_last_sent(struct link *l_ptr)
216 {
217         return mod(link_next_sent(l_ptr) - 1);
218 }
219
220 /*
221  *  Simple non-static link routines (i.e. referenced outside this file)
222  */
223
224 int tipc_link_is_up(struct link *l_ptr)
225 {
226         if (!l_ptr)
227                 return 0;
228         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
229 }
230
231 int tipc_link_is_active(struct link *l_ptr)
232 {
233         return  (l_ptr->owner->active_links[0] == l_ptr) ||
234                 (l_ptr->owner->active_links[1] == l_ptr);
235 }
236
237 /**
238  * link_name_validate - validate & (optionally) deconstruct link name
239  * @name - ptr to link name string
240  * @name_parts - ptr to area for link name components (or NULL if not needed)
241  *
242  * Returns 1 if link name is valid, otherwise 0.
243  */
244
245 static int link_name_validate(const char *name, struct link_name *name_parts)
246 {
247         char name_copy[TIPC_MAX_LINK_NAME];
248         char *addr_local;
249         char *if_local;
250         char *addr_peer;
251         char *if_peer;
252         char dummy;
253         u32 z_local, c_local, n_local;
254         u32 z_peer, c_peer, n_peer;
255         u32 if_local_len;
256         u32 if_peer_len;
257
258         /* copy link name & ensure length is OK */
259
260         name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
261         /* need above in case non-Posix strncpy() doesn't pad with nulls */
262         strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
263         if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
264                 return 0;
265
266         /* ensure all component parts of link name are present */
267
268         addr_local = name_copy;
269         if ((if_local = strchr(addr_local, ':')) == NULL)
270                 return 0;
271         *(if_local++) = 0;
272         if ((addr_peer = strchr(if_local, '-')) == NULL)
273                 return 0;
274         *(addr_peer++) = 0;
275         if_local_len = addr_peer - if_local;
276         if ((if_peer = strchr(addr_peer, ':')) == NULL)
277                 return 0;
278         *(if_peer++) = 0;
279         if_peer_len = strlen(if_peer) + 1;
280
281         /* validate component parts of link name */
282
283         if ((sscanf(addr_local, "%u.%u.%u%c",
284                     &z_local, &c_local, &n_local, &dummy) != 3) ||
285             (sscanf(addr_peer, "%u.%u.%u%c",
286                     &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
287             (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
288             (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
289             (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
290             (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
291             (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
292             (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
293                 return 0;
294
295         /* return link name components, if necessary */
296
297         if (name_parts) {
298                 name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
299                 strcpy(name_parts->if_local, if_local);
300                 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
301                 strcpy(name_parts->if_peer, if_peer);
302         }
303         return 1;
304 }
305
306 /**
307  * link_timeout - handle expiration of link timer
308  * @l_ptr: pointer to link
309  *
310  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
311  * with tipc_link_delete().  (There is no risk that the node will be deleted by
312  * another thread because tipc_link_delete() always cancels the link timer before
313  * tipc_node_delete() is called.)
314  */
315
316 static void link_timeout(struct link *l_ptr)
317 {
318         tipc_node_lock(l_ptr->owner);
319
320         /* update counters used in statistical profiling of send traffic */
321
322         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
323         l_ptr->stats.queue_sz_counts++;
324
325         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
326                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
327
328         if (l_ptr->first_out) {
329                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
330                 u32 length = msg_size(msg);
331
332                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
333                     (msg_type(msg) == FIRST_FRAGMENT)) {
334                         length = msg_size(msg_get_wrapped(msg));
335                 }
336                 if (length) {
337                         l_ptr->stats.msg_lengths_total += length;
338                         l_ptr->stats.msg_length_counts++;
339                         if (length <= 64)
340                                 l_ptr->stats.msg_length_profile[0]++;
341                         else if (length <= 256)
342                                 l_ptr->stats.msg_length_profile[1]++;
343                         else if (length <= 1024)
344                                 l_ptr->stats.msg_length_profile[2]++;
345                         else if (length <= 4096)
346                                 l_ptr->stats.msg_length_profile[3]++;
347                         else if (length <= 16384)
348                                 l_ptr->stats.msg_length_profile[4]++;
349                         else if (length <= 32768)
350                                 l_ptr->stats.msg_length_profile[5]++;
351                         else
352                                 l_ptr->stats.msg_length_profile[6]++;
353                 }
354         }
355
356         /* do all other link processing performed on a periodic basis */
357
358         link_check_defragm_bufs(l_ptr);
359
360         link_state_event(l_ptr, TIMEOUT_EVT);
361
362         if (l_ptr->next_out)
363                 tipc_link_push_queue(l_ptr);
364
365         tipc_node_unlock(l_ptr->owner);
366 }
367
368 static void link_set_timer(struct link *l_ptr, u32 time)
369 {
370         k_start_timer(&l_ptr->timer, time);
371 }
372
373 /**
374  * tipc_link_create - create a new link
375  * @b_ptr: pointer to associated bearer
376  * @peer: network address of node at other end of link
377  * @media_addr: media address to use when sending messages over link
378  *
379  * Returns pointer to link.
380  */
381
382 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
383                               const struct tipc_media_addr *media_addr)
384 {
385         struct link *l_ptr;
386         struct tipc_msg *msg;
387         char *if_name;
388
389         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
390         if (!l_ptr) {
391                 warn("Link creation failed, no memory\n");
392                 return NULL;
393         }
394
395         if (LINK_LOG_BUF_SIZE) {
396                 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
397
398                 if (!pb) {
399                         kfree(l_ptr);
400                         warn("Link creation failed, no memory for print buffer\n");
401                         return NULL;
402                 }
403                 tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
404         }
405
406         l_ptr->addr = peer;
407         if_name = strchr(b_ptr->publ.name, ':') + 1;
408         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
409                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
410                 tipc_node(tipc_own_addr),
411                 if_name,
412                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
413                 /* note: peer i/f is appended to link name by reset/activate */
414         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
415         l_ptr->checkpoint = 1;
416         l_ptr->b_ptr = b_ptr;
417         link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
418         l_ptr->state = RESET_UNKNOWN;
419
420         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
421         msg = l_ptr->pmsg;
422         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
423         msg_set_size(msg, sizeof(l_ptr->proto_msg));
424         msg_set_session(msg, (tipc_random & 0xffff));
425         msg_set_bearer_id(msg, b_ptr->identity);
426         strcpy((char *)msg_data(msg), if_name);
427
428         l_ptr->priority = b_ptr->priority;
429         tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
430
431         link_init_max_pkt(l_ptr);
432
433         l_ptr->next_out_no = 1;
434         INIT_LIST_HEAD(&l_ptr->waiting_ports);
435
436         link_reset_statistics(l_ptr);
437
438         l_ptr->owner = tipc_node_attach_link(l_ptr);
439         if (!l_ptr->owner) {
440                 if (LINK_LOG_BUF_SIZE)
441                         kfree(l_ptr->print_buf.buf);
442                 kfree(l_ptr);
443                 return NULL;
444         }
445
446         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
447         list_add_tail(&l_ptr->link_list, &b_ptr->links);
448         tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
449
450         dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
451             l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
452
453         return l_ptr;
454 }
455
456 /**
457  * tipc_link_delete - delete a link
458  * @l_ptr: pointer to link
459  *
460  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
461  * This routine must not grab the node lock until after link timer cancellation
462  * to avoid a potential deadlock situation.
463  */
464
465 void tipc_link_delete(struct link *l_ptr)
466 {
467         if (!l_ptr) {
468                 err("Attempt to delete non-existent link\n");
469                 return;
470         }
471
472         dbg("tipc_link_delete()\n");
473
474         k_cancel_timer(&l_ptr->timer);
475
476         tipc_node_lock(l_ptr->owner);
477         tipc_link_reset(l_ptr);
478         tipc_node_detach_link(l_ptr->owner, l_ptr);
479         tipc_link_stop(l_ptr);
480         list_del_init(&l_ptr->link_list);
481         if (LINK_LOG_BUF_SIZE)
482                 kfree(l_ptr->print_buf.buf);
483         tipc_node_unlock(l_ptr->owner);
484         k_term_timer(&l_ptr->timer);
485         kfree(l_ptr);
486 }
487
488 static void link_start(struct link *l_ptr)
489 {
490         dbg("link_start %x\n", l_ptr);
491         link_state_event(l_ptr, STARTING_EVT);
492 }
493
494 /**
495  * link_schedule_port - schedule port for deferred sending
496  * @l_ptr: pointer to link
497  * @origport: reference to sending port
498  * @sz: amount of data to be sent
499  *
500  * Schedules port for renewed sending of messages after link congestion
501  * has abated.
502  */
503
504 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
505 {
506         struct port *p_ptr;
507
508         spin_lock_bh(&tipc_port_list_lock);
509         p_ptr = tipc_port_lock(origport);
510         if (p_ptr) {
511                 if (!p_ptr->wakeup)
512                         goto exit;
513                 if (!list_empty(&p_ptr->wait_list))
514                         goto exit;
515                 p_ptr->publ.congested = 1;
516                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
517                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
518                 l_ptr->stats.link_congs++;
519 exit:
520                 tipc_port_unlock(p_ptr);
521         }
522         spin_unlock_bh(&tipc_port_list_lock);
523         return -ELINKCONG;
524 }
525
526 void tipc_link_wakeup_ports(struct link *l_ptr, int all)
527 {
528         struct port *p_ptr;
529         struct port *temp_p_ptr;
530         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
531
532         if (all)
533                 win = 100000;
534         if (win <= 0)
535                 return;
536         if (!spin_trylock_bh(&tipc_port_list_lock))
537                 return;
538         if (link_congested(l_ptr))
539                 goto exit;
540         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
541                                  wait_list) {
542                 if (win <= 0)
543                         break;
544                 list_del_init(&p_ptr->wait_list);
545                 spin_lock_bh(p_ptr->publ.lock);
546                 p_ptr->publ.congested = 0;
547                 p_ptr->wakeup(&p_ptr->publ);
548                 win -= p_ptr->waiting_pkts;
549                 spin_unlock_bh(p_ptr->publ.lock);
550         }
551
552 exit:
553         spin_unlock_bh(&tipc_port_list_lock);
554 }
555
556 /**
557  * link_release_outqueue - purge link's outbound message queue
558  * @l_ptr: pointer to link
559  */
560
561 static void link_release_outqueue(struct link *l_ptr)
562 {
563         struct sk_buff *buf = l_ptr->first_out;
564         struct sk_buff *next;
565
566         while (buf) {
567                 next = buf->next;
568                 buf_discard(buf);
569                 buf = next;
570         }
571         l_ptr->first_out = NULL;
572         l_ptr->out_queue_size = 0;
573 }
574
575 /**
576  * tipc_link_reset_fragments - purge link's inbound message fragments queue
577  * @l_ptr: pointer to link
578  */
579
580 void tipc_link_reset_fragments(struct link *l_ptr)
581 {
582         struct sk_buff *buf = l_ptr->defragm_buf;
583         struct sk_buff *next;
584
585         while (buf) {
586                 next = buf->next;
587                 buf_discard(buf);
588                 buf = next;
589         }
590         l_ptr->defragm_buf = NULL;
591 }
592
593 /**
594  * tipc_link_stop - purge all inbound and outbound messages associated with link
595  * @l_ptr: pointer to link
596  */
597
598 void tipc_link_stop(struct link *l_ptr)
599 {
600         struct sk_buff *buf;
601         struct sk_buff *next;
602
603         buf = l_ptr->oldest_deferred_in;
604         while (buf) {
605                 next = buf->next;
606                 buf_discard(buf);
607                 buf = next;
608         }
609
610         buf = l_ptr->first_out;
611         while (buf) {
612                 next = buf->next;
613                 buf_discard(buf);
614                 buf = next;
615         }
616
617         tipc_link_reset_fragments(l_ptr);
618
619         buf_discard(l_ptr->proto_msg_queue);
620         l_ptr->proto_msg_queue = NULL;
621 }
622
623 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
624 #define link_send_event(fcn, l_ptr, up) do { } while (0)
625
626 void tipc_link_reset(struct link *l_ptr)
627 {
628         struct sk_buff *buf;
629         u32 prev_state = l_ptr->state;
630         u32 checkpoint = l_ptr->next_in_no;
631         int was_active_link = tipc_link_is_active(l_ptr);
632
633         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
634
635         /* Link is down, accept any session */
636         l_ptr->peer_session = INVALID_SESSION;
637
638         /* Prepare for max packet size negotiation */
639         link_init_max_pkt(l_ptr);
640
641         l_ptr->state = RESET_UNKNOWN;
642         dbg_link_state("Resetting Link\n");
643
644         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
645                 return;
646
647         tipc_node_link_down(l_ptr->owner, l_ptr);
648         tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
649
650         if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
651             l_ptr->owner->permit_changeover) {
652                 l_ptr->reset_checkpoint = checkpoint;
653                 l_ptr->exp_msg_count = START_CHANGEOVER;
654         }
655
656         /* Clean up all queues: */
657
658         link_release_outqueue(l_ptr);
659         buf_discard(l_ptr->proto_msg_queue);
660         l_ptr->proto_msg_queue = NULL;
661         buf = l_ptr->oldest_deferred_in;
662         while (buf) {
663                 struct sk_buff *next = buf->next;
664                 buf_discard(buf);
665                 buf = next;
666         }
667         if (!list_empty(&l_ptr->waiting_ports))
668                 tipc_link_wakeup_ports(l_ptr, 1);
669
670         l_ptr->retransm_queue_head = 0;
671         l_ptr->retransm_queue_size = 0;
672         l_ptr->last_out = NULL;
673         l_ptr->first_out = NULL;
674         l_ptr->next_out = NULL;
675         l_ptr->unacked_window = 0;
676         l_ptr->checkpoint = 1;
677         l_ptr->next_out_no = 1;
678         l_ptr->deferred_inqueue_sz = 0;
679         l_ptr->oldest_deferred_in = NULL;
680         l_ptr->newest_deferred_in = NULL;
681         l_ptr->fsm_msg_cnt = 0;
682         l_ptr->stale_count = 0;
683         link_reset_statistics(l_ptr);
684
685         link_send_event(tipc_cfg_link_event, l_ptr, 0);
686         if (!in_own_cluster(l_ptr->addr))
687                 link_send_event(tipc_disc_link_event, l_ptr, 0);
688 }
689
690
691 static void link_activate(struct link *l_ptr)
692 {
693         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
694         tipc_node_link_up(l_ptr->owner, l_ptr);
695         tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
696         link_send_event(tipc_cfg_link_event, l_ptr, 1);
697         if (!in_own_cluster(l_ptr->addr))
698                 link_send_event(tipc_disc_link_event, l_ptr, 1);
699 }
700
701 /**
702  * link_state_event - link finite state machine
703  * @l_ptr: pointer to link
704  * @event: state machine event to process
705  */
706
707 static void link_state_event(struct link *l_ptr, unsigned event)
708 {
709         struct link *other;
710         u32 cont_intv = l_ptr->continuity_interval;
711
712         if (!l_ptr->started && (event != STARTING_EVT))
713                 return;         /* Not yet. */
714
715         if (link_blocked(l_ptr)) {
716                 if (event == TIMEOUT_EVT) {
717                         link_set_timer(l_ptr, cont_intv);
718                 }
719                 return;   /* Changeover going on */
720         }
721         dbg_link("STATE_EV: <%s> ", l_ptr->name);
722
723         switch (l_ptr->state) {
724         case WORKING_WORKING:
725                 dbg_link("WW/");
726                 switch (event) {
727                 case TRAFFIC_MSG_EVT:
728                         dbg_link("TRF-");
729                         /* fall through */
730                 case ACTIVATE_MSG:
731                         dbg_link("ACT\n");
732                         break;
733                 case TIMEOUT_EVT:
734                         dbg_link("TIM ");
735                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
736                                 l_ptr->checkpoint = l_ptr->next_in_no;
737                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
738                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
739                                                                  0, 0, 0, 0, 0);
740                                         l_ptr->fsm_msg_cnt++;
741                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
742                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
743                                                                  1, 0, 0, 0, 0);
744                                         l_ptr->fsm_msg_cnt++;
745                                 }
746                                 link_set_timer(l_ptr, cont_intv);
747                                 break;
748                         }
749                         dbg_link(" -> WU\n");
750                         l_ptr->state = WORKING_UNKNOWN;
751                         l_ptr->fsm_msg_cnt = 0;
752                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
753                         l_ptr->fsm_msg_cnt++;
754                         link_set_timer(l_ptr, cont_intv / 4);
755                         break;
756                 case RESET_MSG:
757                         dbg_link("RES -> RR\n");
758                         info("Resetting link <%s>, requested by peer\n",
759                              l_ptr->name);
760                         tipc_link_reset(l_ptr);
761                         l_ptr->state = RESET_RESET;
762                         l_ptr->fsm_msg_cnt = 0;
763                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
764                         l_ptr->fsm_msg_cnt++;
765                         link_set_timer(l_ptr, cont_intv);
766                         break;
767                 default:
768                         err("Unknown link event %u in WW state\n", event);
769                 }
770                 break;
771         case WORKING_UNKNOWN:
772                 dbg_link("WU/");
773                 switch (event) {
774                 case TRAFFIC_MSG_EVT:
775                         dbg_link("TRF-");
776                 case ACTIVATE_MSG:
777                         dbg_link("ACT -> WW\n");
778                         l_ptr->state = WORKING_WORKING;
779                         l_ptr->fsm_msg_cnt = 0;
780                         link_set_timer(l_ptr, cont_intv);
781                         break;
782                 case RESET_MSG:
783                         dbg_link("RES -> RR\n");
784                         info("Resetting link <%s>, requested by peer "
785                              "while probing\n", l_ptr->name);
786                         tipc_link_reset(l_ptr);
787                         l_ptr->state = RESET_RESET;
788                         l_ptr->fsm_msg_cnt = 0;
789                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
790                         l_ptr->fsm_msg_cnt++;
791                         link_set_timer(l_ptr, cont_intv);
792                         break;
793                 case TIMEOUT_EVT:
794                         dbg_link("TIM ");
795                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
796                                 dbg_link("-> WW\n");
797                                 l_ptr->state = WORKING_WORKING;
798                                 l_ptr->fsm_msg_cnt = 0;
799                                 l_ptr->checkpoint = l_ptr->next_in_no;
800                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
801                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
802                                                                  0, 0, 0, 0, 0);
803                                         l_ptr->fsm_msg_cnt++;
804                                 }
805                                 link_set_timer(l_ptr, cont_intv);
806                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
807                                 dbg_link("Probing %u/%u,timer = %u ms)\n",
808                                          l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
809                                          cont_intv / 4);
810                                 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
811                                                          1, 0, 0, 0, 0);
812                                 l_ptr->fsm_msg_cnt++;
813                                 link_set_timer(l_ptr, cont_intv / 4);
814                         } else {        /* Link has failed */
815                                 dbg_link("-> RU (%u probes unanswered)\n",
816                                          l_ptr->fsm_msg_cnt);
817                                 warn("Resetting link <%s>, peer not responding\n",
818                                      l_ptr->name);
819                                 tipc_link_reset(l_ptr);
820                                 l_ptr->state = RESET_UNKNOWN;
821                                 l_ptr->fsm_msg_cnt = 0;
822                                 tipc_link_send_proto_msg(l_ptr, RESET_MSG,
823                                                          0, 0, 0, 0, 0);
824                                 l_ptr->fsm_msg_cnt++;
825                                 link_set_timer(l_ptr, cont_intv);
826                         }
827                         break;
828                 default:
829                         err("Unknown link event %u in WU state\n", event);
830                 }
831                 break;
832         case RESET_UNKNOWN:
833                 dbg_link("RU/");
834                 switch (event) {
835                 case TRAFFIC_MSG_EVT:
836                         dbg_link("TRF-\n");
837                         break;
838                 case ACTIVATE_MSG:
839                         other = l_ptr->owner->active_links[0];
840                         if (other && link_working_unknown(other)) {
841                                 dbg_link("ACT\n");
842                                 break;
843                         }
844                         dbg_link("ACT -> WW\n");
845                         l_ptr->state = WORKING_WORKING;
846                         l_ptr->fsm_msg_cnt = 0;
847                         link_activate(l_ptr);
848                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
849                         l_ptr->fsm_msg_cnt++;
850                         link_set_timer(l_ptr, cont_intv);
851                         break;
852                 case RESET_MSG:
853                         dbg_link("RES\n");
854                         dbg_link(" -> RR\n");
855                         l_ptr->state = RESET_RESET;
856                         l_ptr->fsm_msg_cnt = 0;
857                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
858                         l_ptr->fsm_msg_cnt++;
859                         link_set_timer(l_ptr, cont_intv);
860                         break;
861                 case STARTING_EVT:
862                         dbg_link("START-");
863                         l_ptr->started = 1;
864                         /* fall through */
865                 case TIMEOUT_EVT:
866                         dbg_link("TIM\n");
867                         tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
868                         l_ptr->fsm_msg_cnt++;
869                         link_set_timer(l_ptr, cont_intv);
870                         break;
871                 default:
872                         err("Unknown link event %u in RU state\n", event);
873                 }
874                 break;
875         case RESET_RESET:
876                 dbg_link("RR/ ");
877                 switch (event) {
878                 case TRAFFIC_MSG_EVT:
879                         dbg_link("TRF-");
880                         /* fall through */
881                 case ACTIVATE_MSG:
882                         other = l_ptr->owner->active_links[0];
883                         if (other && link_working_unknown(other)) {
884                                 dbg_link("ACT\n");
885                                 break;
886                         }
887                         dbg_link("ACT -> WW\n");
888                         l_ptr->state = WORKING_WORKING;
889                         l_ptr->fsm_msg_cnt = 0;
890                         link_activate(l_ptr);
891                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
892                         l_ptr->fsm_msg_cnt++;
893                         link_set_timer(l_ptr, cont_intv);
894                         break;
895                 case RESET_MSG:
896                         dbg_link("RES\n");
897                         break;
898                 case TIMEOUT_EVT:
899                         dbg_link("TIM\n");
900                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
901                         l_ptr->fsm_msg_cnt++;
902                         link_set_timer(l_ptr, cont_intv);
903                         dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
904                         break;
905                 default:
906                         err("Unknown link event %u in RR state\n", event);
907                 }
908                 break;
909         default:
910                 err("Unknown link state %u/%u\n", l_ptr->state, event);
911         }
912 }
913
914 /*
915  * link_bundle_buf(): Append contents of a buffer to
916  * the tail of an existing one.
917  */
918
919 static int link_bundle_buf(struct link *l_ptr,
920                            struct sk_buff *bundler,
921                            struct sk_buff *buf)
922 {
923         struct tipc_msg *bundler_msg = buf_msg(bundler);
924         struct tipc_msg *msg = buf_msg(buf);
925         u32 size = msg_size(msg);
926         u32 bundle_size = msg_size(bundler_msg);
927         u32 to_pos = align(bundle_size);
928         u32 pad = to_pos - bundle_size;
929
930         if (msg_user(bundler_msg) != MSG_BUNDLER)
931                 return 0;
932         if (msg_type(bundler_msg) != OPEN_MSG)
933                 return 0;
934         if (skb_tailroom(bundler) < (pad + size))
935                 return 0;
936         if (l_ptr->max_pkt < (to_pos + size))
937                 return 0;
938
939         skb_put(bundler, pad + size);
940         skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
941         msg_set_size(bundler_msg, to_pos + size);
942         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
943         dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
944             msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
945         msg_dbg(msg, "PACKD:");
946         buf_discard(buf);
947         l_ptr->stats.sent_bundled++;
948         return 1;
949 }
950
951 static void link_add_to_outqueue(struct link *l_ptr,
952                                  struct sk_buff *buf,
953                                  struct tipc_msg *msg)
954 {
955         u32 ack = mod(l_ptr->next_in_no - 1);
956         u32 seqno = mod(l_ptr->next_out_no++);
957
958         msg_set_word(msg, 2, ((ack << 16) | seqno));
959         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
960         buf->next = NULL;
961         if (l_ptr->first_out) {
962                 l_ptr->last_out->next = buf;
963                 l_ptr->last_out = buf;
964         } else
965                 l_ptr->first_out = l_ptr->last_out = buf;
966         l_ptr->out_queue_size++;
967 }
968
969 /*
970  * tipc_link_send_buf() is the 'full path' for messages, called from
971  * inside TIPC when the 'fast path' in tipc_send_buf
972  * has failed, and from link_send()
973  */
974
975 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
976 {
977         struct tipc_msg *msg = buf_msg(buf);
978         u32 size = msg_size(msg);
979         u32 dsz = msg_data_sz(msg);
980         u32 queue_size = l_ptr->out_queue_size;
981         u32 imp = tipc_msg_tot_importance(msg);
982         u32 queue_limit = l_ptr->queue_limit[imp];
983         u32 max_packet = l_ptr->max_pkt;
984
985         msg_set_prevnode(msg, tipc_own_addr);   /* If routed message */
986
987         /* Match msg importance against queue limits: */
988
989         if (unlikely(queue_size >= queue_limit)) {
990                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
991                         return link_schedule_port(l_ptr, msg_origport(msg),
992                                                   size);
993                 }
994                 msg_dbg(msg, "TIPC: Congestion, throwing away\n");
995                 buf_discard(buf);
996                 if (imp > CONN_MANAGER) {
997                         warn("Resetting link <%s>, send queue full", l_ptr->name);
998                         tipc_link_reset(l_ptr);
999                 }
1000                 return dsz;
1001         }
1002
1003         /* Fragmentation needed ? */
1004
1005         if (size > max_packet)
1006                 return link_send_long_buf(l_ptr, buf);
1007
1008         /* Packet can be queued or sent: */
1009
1010         if (queue_size > l_ptr->stats.max_queue_sz)
1011                 l_ptr->stats.max_queue_sz = queue_size;
1012
1013         if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1014                    !link_congested(l_ptr))) {
1015                 link_add_to_outqueue(l_ptr, buf, msg);
1016
1017                 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1018                         l_ptr->unacked_window = 0;
1019                 } else {
1020                         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1021                         l_ptr->stats.bearer_congs++;
1022                         l_ptr->next_out = buf;
1023                 }
1024                 return dsz;
1025         }
1026         /* Congestion: can message be bundled ?: */
1027
1028         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1029             (msg_user(msg) != MSG_FRAGMENTER)) {
1030
1031                 /* Try adding message to an existing bundle */
1032
1033                 if (l_ptr->next_out &&
1034                     link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1035                         tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1036                         return dsz;
1037                 }
1038
1039                 /* Try creating a new bundle */
1040
1041                 if (size <= max_packet * 2 / 3) {
1042                         struct sk_buff *bundler = tipc_buf_acquire(max_packet);
1043                         struct tipc_msg bundler_hdr;
1044
1045                         if (bundler) {
1046                                 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1047                                          INT_H_SIZE, l_ptr->addr);
1048                                 skb_copy_to_linear_data(bundler, &bundler_hdr,
1049                                                         INT_H_SIZE);
1050                                 skb_trim(bundler, INT_H_SIZE);
1051                                 link_bundle_buf(l_ptr, bundler, buf);
1052                                 buf = bundler;
1053                                 msg = buf_msg(buf);
1054                                 l_ptr->stats.sent_bundles++;
1055                         }
1056                 }
1057         }
1058         if (!l_ptr->next_out)
1059                 l_ptr->next_out = buf;
1060         link_add_to_outqueue(l_ptr, buf, msg);
1061         tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1062         return dsz;
1063 }
1064
1065 /*
1066  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1067  * not been selected yet, and the the owner node is not locked
1068  * Called by TIPC internal users, e.g. the name distributor
1069  */
1070
1071 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1072 {
1073         struct link *l_ptr;
1074         struct tipc_node *n_ptr;
1075         int res = -ELINKCONG;
1076
1077         read_lock_bh(&tipc_net_lock);
1078         n_ptr = tipc_node_select(dest, selector);
1079         if (n_ptr) {
1080                 tipc_node_lock(n_ptr);
1081                 l_ptr = n_ptr->active_links[selector & 1];
1082                 if (l_ptr) {
1083                         dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1084                         res = tipc_link_send_buf(l_ptr, buf);
1085                 } else {
1086                         dbg("Attempt to send msg to unreachable node:\n");
1087                         msg_dbg(buf_msg(buf),">>>");
1088                         buf_discard(buf);
1089                 }
1090                 tipc_node_unlock(n_ptr);
1091         } else {
1092                 dbg("Attempt to send msg to unknown node:\n");
1093                 msg_dbg(buf_msg(buf),">>>");
1094                 buf_discard(buf);
1095         }
1096         read_unlock_bh(&tipc_net_lock);
1097         return res;
1098 }
1099
1100 /*
1101  * link_send_buf_fast: Entry for data messages where the
1102  * destination link is known and the header is complete,
1103  * inclusive total message length. Very time critical.
1104  * Link is locked. Returns user data length.
1105  */
1106
1107 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1108                               u32 *used_max_pkt)
1109 {
1110         struct tipc_msg *msg = buf_msg(buf);
1111         int res = msg_data_sz(msg);
1112
1113         if (likely(!link_congested(l_ptr))) {
1114                 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1115                         if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1116                                 link_add_to_outqueue(l_ptr, buf, msg);
1117                                 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1118                                                             &l_ptr->media_addr))) {
1119                                         l_ptr->unacked_window = 0;
1120                                         msg_dbg(msg,"SENT_FAST:");
1121                                         return res;
1122                                 }
1123                                 dbg("failed sent fast...\n");
1124                                 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1125                                 l_ptr->stats.bearer_congs++;
1126                                 l_ptr->next_out = buf;
1127                                 return res;
1128                         }
1129                 }
1130                 else
1131                         *used_max_pkt = l_ptr->max_pkt;
1132         }
1133         return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1134 }
1135
1136 /*
1137  * tipc_send_buf_fast: Entry for data messages where the
1138  * destination node is known and the header is complete,
1139  * inclusive total message length.
1140  * Returns user data length.
1141  */
1142 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1143 {
1144         struct link *l_ptr;
1145         struct tipc_node *n_ptr;
1146         int res;
1147         u32 selector = msg_origport(buf_msg(buf)) & 1;
1148         u32 dummy;
1149
1150         if (destnode == tipc_own_addr)
1151                 return tipc_port_recv_msg(buf);
1152
1153         read_lock_bh(&tipc_net_lock);
1154         n_ptr = tipc_node_select(destnode, selector);
1155         if (likely(n_ptr)) {
1156                 tipc_node_lock(n_ptr);
1157                 l_ptr = n_ptr->active_links[selector];
1158                 dbg("send_fast: buf %x selected %x, destnode = %x\n",
1159                     buf, l_ptr, destnode);
1160                 if (likely(l_ptr)) {
1161                         res = link_send_buf_fast(l_ptr, buf, &dummy);
1162                         tipc_node_unlock(n_ptr);
1163                         read_unlock_bh(&tipc_net_lock);
1164                         return res;
1165                 }
1166                 tipc_node_unlock(n_ptr);
1167         }
1168         read_unlock_bh(&tipc_net_lock);
1169         res = msg_data_sz(buf_msg(buf));
1170         tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1171         return res;
1172 }
1173
1174
1175 /*
1176  * tipc_link_send_sections_fast: Entry for messages where the
1177  * destination processor is known and the header is complete,
1178  * except for total message length.
1179  * Returns user data length or errno.
1180  */
1181 int tipc_link_send_sections_fast(struct port *sender,
1182                                  struct iovec const *msg_sect,
1183                                  const u32 num_sect,
1184                                  u32 destaddr)
1185 {
1186         struct tipc_msg *hdr = &sender->publ.phdr;
1187         struct link *l_ptr;
1188         struct sk_buff *buf;
1189         struct tipc_node *node;
1190         int res;
1191         u32 selector = msg_origport(hdr) & 1;
1192
1193 again:
1194         /*
1195          * Try building message using port's max_pkt hint.
1196          * (Must not hold any locks while building message.)
1197          */
1198
1199         res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1200                         !sender->user_port, &buf);
1201
1202         read_lock_bh(&tipc_net_lock);
1203         node = tipc_node_select(destaddr, selector);
1204         if (likely(node)) {
1205                 tipc_node_lock(node);
1206                 l_ptr = node->active_links[selector];
1207                 if (likely(l_ptr)) {
1208                         if (likely(buf)) {
1209                                 res = link_send_buf_fast(l_ptr, buf,
1210                                                          &sender->publ.max_pkt);
1211                                 if (unlikely(res < 0))
1212                                         buf_discard(buf);
1213 exit:
1214                                 tipc_node_unlock(node);
1215                                 read_unlock_bh(&tipc_net_lock);
1216                                 return res;
1217                         }
1218
1219                         /* Exit if build request was invalid */
1220
1221                         if (unlikely(res < 0))
1222                                 goto exit;
1223
1224                         /* Exit if link (or bearer) is congested */
1225
1226                         if (link_congested(l_ptr) ||
1227                             !list_empty(&l_ptr->b_ptr->cong_links)) {
1228                                 res = link_schedule_port(l_ptr,
1229                                                          sender->publ.ref, res);
1230                                 goto exit;
1231                         }
1232
1233                         /*
1234                          * Message size exceeds max_pkt hint; update hint,
1235                          * then re-try fast path or fragment the message
1236                          */
1237
1238                         sender->publ.max_pkt = l_ptr->max_pkt;
1239                         tipc_node_unlock(node);
1240                         read_unlock_bh(&tipc_net_lock);
1241
1242
1243                         if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1244                                 goto again;
1245
1246                         return link_send_sections_long(sender, msg_sect,
1247                                                        num_sect, destaddr);
1248                 }
1249                 tipc_node_unlock(node);
1250         }
1251         read_unlock_bh(&tipc_net_lock);
1252
1253         /* Couldn't find a link to the destination node */
1254
1255         if (buf)
1256                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1257         if (res >= 0)
1258                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1259                                                  TIPC_ERR_NO_NODE);
1260         return res;
1261 }
1262
1263 /*
1264  * link_send_sections_long(): Entry for long messages where the
1265  * destination node is known and the header is complete,
1266  * inclusive total message length.
1267  * Link and bearer congestion status have been checked to be ok,
1268  * and are ignored if they change.
1269  *
1270  * Note that fragments do not use the full link MTU so that they won't have
1271  * to undergo refragmentation if link changeover causes them to be sent
1272  * over another link with an additional tunnel header added as prefix.
1273  * (Refragmentation will still occur if the other link has a smaller MTU.)
1274  *
1275  * Returns user data length or errno.
1276  */
1277 static int link_send_sections_long(struct port *sender,
1278                                    struct iovec const *msg_sect,
1279                                    u32 num_sect,
1280                                    u32 destaddr)
1281 {
1282         struct link *l_ptr;
1283         struct tipc_node *node;
1284         struct tipc_msg *hdr = &sender->publ.phdr;
1285         u32 dsz = msg_data_sz(hdr);
1286         u32 max_pkt,fragm_sz,rest;
1287         struct tipc_msg fragm_hdr;
1288         struct sk_buff *buf,*buf_chain,*prev;
1289         u32 fragm_crs,fragm_rest,hsz,sect_rest;
1290         const unchar *sect_crs;
1291         int curr_sect;
1292         u32 fragm_no;
1293
1294 again:
1295         fragm_no = 1;
1296         max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1297                 /* leave room for tunnel header in case of link changeover */
1298         fragm_sz = max_pkt - INT_H_SIZE;
1299                 /* leave room for fragmentation header in each fragment */
1300         rest = dsz;
1301         fragm_crs = 0;
1302         fragm_rest = 0;
1303         sect_rest = 0;
1304         sect_crs = NULL;
1305         curr_sect = -1;
1306
1307         /* Prepare reusable fragment header: */
1308
1309         msg_dbg(hdr, ">FRAGMENTING>");
1310         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1311                  INT_H_SIZE, msg_destnode(hdr));
1312         msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1313         msg_set_size(&fragm_hdr, max_pkt);
1314         msg_set_fragm_no(&fragm_hdr, 1);
1315
1316         /* Prepare header of first fragment: */
1317
1318         buf_chain = buf = tipc_buf_acquire(max_pkt);
1319         if (!buf)
1320                 return -ENOMEM;
1321         buf->next = NULL;
1322         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1323         hsz = msg_hdr_sz(hdr);
1324         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1325         msg_dbg(buf_msg(buf), ">BUILD>");
1326
1327         /* Chop up message: */
1328
1329         fragm_crs = INT_H_SIZE + hsz;
1330         fragm_rest = fragm_sz - hsz;
1331
1332         do {            /* For all sections */
1333                 u32 sz;
1334
1335                 if (!sect_rest) {
1336                         sect_rest = msg_sect[++curr_sect].iov_len;
1337                         sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1338                 }
1339
1340                 if (sect_rest < fragm_rest)
1341                         sz = sect_rest;
1342                 else
1343                         sz = fragm_rest;
1344
1345                 if (likely(!sender->user_port)) {
1346                         if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1347 error:
1348                                 for (; buf_chain; buf_chain = buf) {
1349                                         buf = buf_chain->next;
1350                                         buf_discard(buf_chain);
1351                                 }
1352                                 return -EFAULT;
1353                         }
1354                 } else
1355                         skb_copy_to_linear_data_offset(buf, fragm_crs,
1356                                                        sect_crs, sz);
1357                 sect_crs += sz;
1358                 sect_rest -= sz;
1359                 fragm_crs += sz;
1360                 fragm_rest -= sz;
1361                 rest -= sz;
1362
1363                 if (!fragm_rest && rest) {
1364
1365                         /* Initiate new fragment: */
1366                         if (rest <= fragm_sz) {
1367                                 fragm_sz = rest;
1368                                 msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1369                         } else {
1370                                 msg_set_type(&fragm_hdr, FRAGMENT);
1371                         }
1372                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1373                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1374                         prev = buf;
1375                         buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1376                         if (!buf)
1377                                 goto error;
1378
1379                         buf->next = NULL;
1380                         prev->next = buf;
1381                         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1382                         fragm_crs = INT_H_SIZE;
1383                         fragm_rest = fragm_sz;
1384                         msg_dbg(buf_msg(buf),"  >BUILD>");
1385                 }
1386         }
1387         while (rest > 0);
1388
1389         /*
1390          * Now we have a buffer chain. Select a link and check
1391          * that packet size is still OK
1392          */
1393         node = tipc_node_select(destaddr, sender->publ.ref & 1);
1394         if (likely(node)) {
1395                 tipc_node_lock(node);
1396                 l_ptr = node->active_links[sender->publ.ref & 1];
1397                 if (!l_ptr) {
1398                         tipc_node_unlock(node);
1399                         goto reject;
1400                 }
1401                 if (l_ptr->max_pkt < max_pkt) {
1402                         sender->publ.max_pkt = l_ptr->max_pkt;
1403                         tipc_node_unlock(node);
1404                         for (; buf_chain; buf_chain = buf) {
1405                                 buf = buf_chain->next;
1406                                 buf_discard(buf_chain);
1407                         }
1408                         goto again;
1409                 }
1410         } else {
1411 reject:
1412                 for (; buf_chain; buf_chain = buf) {
1413                         buf = buf_chain->next;
1414                         buf_discard(buf_chain);
1415                 }
1416                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1417                                                  TIPC_ERR_NO_NODE);
1418         }
1419
1420         /* Append whole chain to send queue: */
1421
1422         buf = buf_chain;
1423         l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1424         if (!l_ptr->next_out)
1425                 l_ptr->next_out = buf_chain;
1426         l_ptr->stats.sent_fragmented++;
1427         while (buf) {
1428                 struct sk_buff *next = buf->next;
1429                 struct tipc_msg *msg = buf_msg(buf);
1430
1431                 l_ptr->stats.sent_fragments++;
1432                 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1433                 link_add_to_outqueue(l_ptr, buf, msg);
1434                 msg_dbg(msg, ">ADD>");
1435                 buf = next;
1436         }
1437
1438         /* Send it, if possible: */
1439
1440         tipc_link_push_queue(l_ptr);
1441         tipc_node_unlock(node);
1442         return dsz;
1443 }
1444
1445 /*
1446  * tipc_link_push_packet: Push one unsent packet to the media
1447  */
1448 u32 tipc_link_push_packet(struct link *l_ptr)
1449 {
1450         struct sk_buff *buf = l_ptr->first_out;
1451         u32 r_q_size = l_ptr->retransm_queue_size;
1452         u32 r_q_head = l_ptr->retransm_queue_head;
1453
1454         /* Step to position where retransmission failed, if any,    */
1455         /* consider that buffers may have been released in meantime */
1456
1457         if (r_q_size && buf) {
1458                 u32 last = lesser(mod(r_q_head + r_q_size),
1459                                   link_last_sent(l_ptr));
1460                 u32 first = msg_seqno(buf_msg(buf));
1461
1462                 while (buf && less(first, r_q_head)) {
1463                         first = mod(first + 1);
1464                         buf = buf->next;
1465                 }
1466                 l_ptr->retransm_queue_head = r_q_head = first;
1467                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1468         }
1469
1470         /* Continue retransmission now, if there is anything: */
1471
1472         if (r_q_size && buf) {
1473                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1474                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1475                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1476                         msg_dbg(buf_msg(buf), ">DEF-RETR>");
1477                         l_ptr->retransm_queue_head = mod(++r_q_head);
1478                         l_ptr->retransm_queue_size = --r_q_size;
1479                         l_ptr->stats.retransmitted++;
1480                         return 0;
1481                 } else {
1482                         l_ptr->stats.bearer_congs++;
1483                         msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1484                         return PUSH_FAILED;
1485                 }
1486         }
1487
1488         /* Send deferred protocol message, if any: */
1489
1490         buf = l_ptr->proto_msg_queue;
1491         if (buf) {
1492                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1493                 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1494                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1495                         msg_dbg(buf_msg(buf), ">DEF-PROT>");
1496                         l_ptr->unacked_window = 0;
1497                         buf_discard(buf);
1498                         l_ptr->proto_msg_queue = NULL;
1499                         return 0;
1500                 } else {
1501                         msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1502                         l_ptr->stats.bearer_congs++;
1503                         return PUSH_FAILED;
1504                 }
1505         }
1506
1507         /* Send one deferred data message, if send window not full: */
1508
1509         buf = l_ptr->next_out;
1510         if (buf) {
1511                 struct tipc_msg *msg = buf_msg(buf);
1512                 u32 next = msg_seqno(msg);
1513                 u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1514
1515                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1516                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1517                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1518                         if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1519                                 if (msg_user(msg) == MSG_BUNDLER)
1520                                         msg_set_type(msg, CLOSED_MSG);
1521                                 msg_dbg(msg, ">PUSH-DATA>");
1522                                 l_ptr->next_out = buf->next;
1523                                 return 0;
1524                         } else {
1525                                 msg_dbg(msg, "|PUSH-DATA|");
1526                                 l_ptr->stats.bearer_congs++;
1527                                 return PUSH_FAILED;
1528                         }
1529                 }
1530         }
1531         return PUSH_FINISHED;
1532 }
1533
1534 /*
1535  * push_queue(): push out the unsent messages of a link where
1536  *               congestion has abated. Node is locked
1537  */
1538 void tipc_link_push_queue(struct link *l_ptr)
1539 {
1540         u32 res;
1541
1542         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1543                 return;
1544
1545         do {
1546                 res = tipc_link_push_packet(l_ptr);
1547         } while (!res);
1548
1549         if (res == PUSH_FAILED)
1550                 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1551 }
1552
1553 static void link_reset_all(unsigned long addr)
1554 {
1555         struct tipc_node *n_ptr;
1556         char addr_string[16];
1557         u32 i;
1558
1559         read_lock_bh(&tipc_net_lock);
1560         n_ptr = tipc_node_find((u32)addr);
1561         if (!n_ptr) {
1562                 read_unlock_bh(&tipc_net_lock);
1563                 return; /* node no longer exists */
1564         }
1565
1566         tipc_node_lock(n_ptr);
1567
1568         warn("Resetting all links to %s\n",
1569              tipc_addr_string_fill(addr_string, n_ptr->addr));
1570
1571         for (i = 0; i < MAX_BEARERS; i++) {
1572                 if (n_ptr->links[i]) {
1573                         link_print(n_ptr->links[i], TIPC_OUTPUT,
1574                                    "Resetting link\n");
1575                         tipc_link_reset(n_ptr->links[i]);
1576                 }
1577         }
1578
1579         tipc_node_unlock(n_ptr);
1580         read_unlock_bh(&tipc_net_lock);
1581 }
1582
1583 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1584 {
1585         struct tipc_msg *msg = buf_msg(buf);
1586
1587         warn("Retransmission failure on link <%s>\n", l_ptr->name);
1588         tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1589
1590         if (l_ptr->addr) {
1591
1592                 /* Handle failure on standard link */
1593
1594                 link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1595                 tipc_link_reset(l_ptr);
1596
1597         } else {
1598
1599                 /* Handle failure on broadcast link */
1600
1601                 struct tipc_node *n_ptr;
1602                 char addr_string[16];
1603
1604                 tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1605                 tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1606                                      (unsigned long) TIPC_SKB_CB(buf)->handle);
1607
1608                 n_ptr = l_ptr->owner->next;
1609                 tipc_node_lock(n_ptr);
1610
1611                 tipc_addr_string_fill(addr_string, n_ptr->addr);
1612                 tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1613                 tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1614                 tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1615                 tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1616                 tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1617                 tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1618                 tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1619
1620                 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1621
1622                 tipc_node_unlock(n_ptr);
1623
1624                 l_ptr->stale_count = 0;
1625         }
1626 }
1627
1628 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1629                           u32 retransmits)
1630 {
1631         struct tipc_msg *msg;
1632
1633         if (!buf)
1634                 return;
1635
1636         msg = buf_msg(buf);
1637
1638         dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1639
1640         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1641                 if (l_ptr->retransm_queue_size == 0) {
1642                         msg_dbg(msg, ">NO_RETR->BCONG>");
1643                         dbg_print_link(l_ptr, "   ");
1644                         l_ptr->retransm_queue_head = msg_seqno(msg);
1645                         l_ptr->retransm_queue_size = retransmits;
1646                 } else {
1647                         err("Unexpected retransmit on link %s (qsize=%d)\n",
1648                             l_ptr->name, l_ptr->retransm_queue_size);
1649                 }
1650                 return;
1651         } else {
1652                 /* Detect repeated retransmit failures on uncongested bearer */
1653
1654                 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1655                         if (++l_ptr->stale_count > 100) {
1656                                 link_retransmit_failure(l_ptr, buf);
1657                                 return;
1658                         }
1659                 } else {
1660                         l_ptr->last_retransmitted = msg_seqno(msg);
1661                         l_ptr->stale_count = 1;
1662                 }
1663         }
1664
1665         while (retransmits && (buf != l_ptr->next_out) && buf) {
1666                 msg = buf_msg(buf);
1667                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1668                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1669                 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1670                         msg_dbg(buf_msg(buf), ">RETR>");
1671                         buf = buf->next;
1672                         retransmits--;
1673                         l_ptr->stats.retransmitted++;
1674                 } else {
1675                         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1676                         l_ptr->stats.bearer_congs++;
1677                         l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1678                         l_ptr->retransm_queue_size = retransmits;
1679                         return;
1680                 }
1681         }
1682
1683         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1684 }
1685
1686 /**
1687  * link_insert_deferred_queue - insert deferred messages back into receive chain
1688  */
1689
1690 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1691                                                   struct sk_buff *buf)
1692 {
1693         u32 seq_no;
1694
1695         if (l_ptr->oldest_deferred_in == NULL)
1696                 return buf;
1697
1698         seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1699         if (seq_no == mod(l_ptr->next_in_no)) {
1700                 l_ptr->newest_deferred_in->next = buf;
1701                 buf = l_ptr->oldest_deferred_in;
1702                 l_ptr->oldest_deferred_in = NULL;
1703                 l_ptr->deferred_inqueue_sz = 0;
1704         }
1705         return buf;
1706 }
1707
1708 /**
1709  * link_recv_buf_validate - validate basic format of received message
1710  *
1711  * This routine ensures a TIPC message has an acceptable header, and at least
1712  * as much data as the header indicates it should.  The routine also ensures
1713  * that the entire message header is stored in the main fragment of the message
1714  * buffer, to simplify future access to message header fields.
1715  *
1716  * Note: Having extra info present in the message header or data areas is OK.
1717  * TIPC will ignore the excess, under the assumption that it is optional info
1718  * introduced by a later release of the protocol.
1719  */
1720
1721 static int link_recv_buf_validate(struct sk_buff *buf)
1722 {
1723         static u32 min_data_hdr_size[8] = {
1724                 SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1725                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1726                 };
1727
1728         struct tipc_msg *msg;
1729         u32 tipc_hdr[2];
1730         u32 size;
1731         u32 hdr_size;
1732         u32 min_hdr_size;
1733
1734         if (unlikely(buf->len < MIN_H_SIZE))
1735                 return 0;
1736
1737         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1738         if (msg == NULL)
1739                 return 0;
1740
1741         if (unlikely(msg_version(msg) != TIPC_VERSION))
1742                 return 0;
1743
1744         size = msg_size(msg);
1745         hdr_size = msg_hdr_sz(msg);
1746         min_hdr_size = msg_isdata(msg) ?
1747                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1748
1749         if (unlikely((hdr_size < min_hdr_size) ||
1750                      (size < hdr_size) ||
1751                      (buf->len < size) ||
1752                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1753                 return 0;
1754
1755         return pskb_may_pull(buf, hdr_size);
1756 }
1757
1758 /**
1759  * tipc_recv_msg - process TIPC messages arriving from off-node
1760  * @head: pointer to message buffer chain
1761  * @tb_ptr: pointer to bearer message arrived on
1762  *
1763  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1764  * structure (i.e. cannot be NULL), but bearer can be inactive.
1765  */
1766
1767 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1768 {
1769         read_lock_bh(&tipc_net_lock);
1770         while (head) {
1771                 struct bearer *b_ptr = (struct bearer *)tb_ptr;
1772                 struct tipc_node *n_ptr;
1773                 struct link *l_ptr;
1774                 struct sk_buff *crs;
1775                 struct sk_buff *buf = head;
1776                 struct tipc_msg *msg;
1777                 u32 seq_no;
1778                 u32 ackd;
1779                 u32 released = 0;
1780                 int type;
1781
1782                 head = head->next;
1783
1784                 /* Ensure bearer is still enabled */
1785
1786                 if (unlikely(!b_ptr->active))
1787                         goto cont;
1788
1789                 /* Ensure message is well-formed */
1790
1791                 if (unlikely(!link_recv_buf_validate(buf)))
1792                         goto cont;
1793
1794                 /* Ensure message data is a single contiguous unit */
1795
1796                 if (unlikely(buf_linearize(buf))) {
1797                         goto cont;
1798                 }
1799
1800                 /* Handle arrival of a non-unicast link message */
1801
1802                 msg = buf_msg(buf);
1803
1804                 if (unlikely(msg_non_seq(msg))) {
1805                         if (msg_user(msg) ==  LINK_CONFIG)
1806                                 tipc_disc_recv_msg(buf, b_ptr);
1807                         else
1808                                 tipc_bclink_recv_pkt(buf);
1809                         continue;
1810                 }
1811
1812                 if (unlikely(!msg_short(msg) &&
1813                              (msg_destnode(msg) != tipc_own_addr)))
1814                         goto cont;
1815
1816                 /* Discard non-routeable messages destined for another node */
1817
1818                 if (unlikely(!msg_isdata(msg) &&
1819                              (msg_destnode(msg) != tipc_own_addr))) {
1820                         if ((msg_user(msg) != CONN_MANAGER) &&
1821                             (msg_user(msg) != MSG_FRAGMENTER))
1822                                 goto cont;
1823                 }
1824
1825                 /* Locate neighboring node that sent message */
1826
1827                 n_ptr = tipc_node_find(msg_prevnode(msg));
1828                 if (unlikely(!n_ptr))
1829                         goto cont;
1830                 tipc_node_lock(n_ptr);
1831
1832                 /* Don't talk to neighbor during cleanup after last session */
1833
1834                 if (n_ptr->cleanup_required) {
1835                         tipc_node_unlock(n_ptr);
1836                         goto cont;
1837                 }
1838
1839                 /* Locate unicast link endpoint that should handle message */
1840
1841                 l_ptr = n_ptr->links[b_ptr->identity];
1842                 if (unlikely(!l_ptr)) {
1843                         tipc_node_unlock(n_ptr);
1844                         goto cont;
1845                 }
1846
1847                 /* Validate message sequence number info */
1848
1849                 seq_no = msg_seqno(msg);
1850                 ackd = msg_ack(msg);
1851
1852                 /* Release acked messages */
1853
1854                 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1855                         if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1856                                 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1857                 }
1858
1859                 crs = l_ptr->first_out;
1860                 while ((crs != l_ptr->next_out) &&
1861                        less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1862                         struct sk_buff *next = crs->next;
1863
1864                         buf_discard(crs);
1865                         crs = next;
1866                         released++;
1867                 }
1868                 if (released) {
1869                         l_ptr->first_out = crs;
1870                         l_ptr->out_queue_size -= released;
1871                 }
1872
1873                 /* Try sending any messages link endpoint has pending */
1874
1875                 if (unlikely(l_ptr->next_out))
1876                         tipc_link_push_queue(l_ptr);
1877                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1878                         tipc_link_wakeup_ports(l_ptr, 0);
1879                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1880                         l_ptr->stats.sent_acks++;
1881                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1882                 }
1883
1884                 /* Now (finally!) process the incoming message */
1885
1886 protocol_check:
1887                 if (likely(link_working_working(l_ptr))) {
1888                         if (likely(seq_no == mod(l_ptr->next_in_no))) {
1889                                 l_ptr->next_in_no++;
1890                                 if (unlikely(l_ptr->oldest_deferred_in))
1891                                         head = link_insert_deferred_queue(l_ptr,
1892                                                                           head);
1893                                 if (likely(msg_is_dest(msg, tipc_own_addr))) {
1894 deliver:
1895                                         if (likely(msg_isdata(msg))) {
1896                                                 tipc_node_unlock(n_ptr);
1897                                                 tipc_port_recv_msg(buf);
1898                                                 continue;
1899                                         }
1900                                         switch (msg_user(msg)) {
1901                                         case MSG_BUNDLER:
1902                                                 l_ptr->stats.recv_bundles++;
1903                                                 l_ptr->stats.recv_bundled +=
1904                                                         msg_msgcnt(msg);
1905                                                 tipc_node_unlock(n_ptr);
1906                                                 tipc_link_recv_bundle(buf);
1907                                                 continue;
1908                                         case ROUTE_DISTRIBUTOR:
1909                                                 tipc_node_unlock(n_ptr);
1910                                                 tipc_cltr_recv_routing_table(buf);
1911                                                 continue;
1912                                         case NAME_DISTRIBUTOR:
1913                                                 tipc_node_unlock(n_ptr);
1914                                                 tipc_named_recv(buf);
1915                                                 continue;
1916                                         case CONN_MANAGER:
1917                                                 tipc_node_unlock(n_ptr);
1918                                                 tipc_port_recv_proto_msg(buf);
1919                                                 continue;
1920                                         case MSG_FRAGMENTER:
1921                                                 l_ptr->stats.recv_fragments++;
1922                                                 if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1923                                                                             &buf, &msg)) {
1924                                                         l_ptr->stats.recv_fragmented++;
1925                                                         goto deliver;
1926                                                 }
1927                                                 break;
1928                                         case CHANGEOVER_PROTOCOL:
1929                                                 type = msg_type(msg);
1930                                                 if (link_recv_changeover_msg(&l_ptr, &buf)) {
1931                                                         msg = buf_msg(buf);
1932                                                         seq_no = msg_seqno(msg);
1933                                                         if (type == ORIGINAL_MSG)
1934                                                                 goto deliver;
1935                                                         goto protocol_check;
1936                                                 }
1937                                                 break;
1938                                         }
1939                                 }
1940                                 tipc_node_unlock(n_ptr);
1941                                 tipc_net_route_msg(buf);
1942                                 continue;
1943                         }
1944                         link_handle_out_of_seq_msg(l_ptr, buf);
1945                         head = link_insert_deferred_queue(l_ptr, head);
1946                         tipc_node_unlock(n_ptr);
1947                         continue;
1948                 }
1949
1950                 if (msg_user(msg) == LINK_PROTOCOL) {
1951                         link_recv_proto_msg(l_ptr, buf);
1952                         head = link_insert_deferred_queue(l_ptr, head);
1953                         tipc_node_unlock(n_ptr);
1954                         continue;
1955                 }
1956                 msg_dbg(msg,"NSEQ<REC<");
1957                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1958
1959                 if (link_working_working(l_ptr)) {
1960                         /* Re-insert in front of queue */
1961                         msg_dbg(msg,"RECV-REINS:");
1962                         buf->next = head;
1963                         head = buf;
1964                         tipc_node_unlock(n_ptr);
1965                         continue;
1966                 }
1967                 tipc_node_unlock(n_ptr);
1968 cont:
1969                 buf_discard(buf);
1970         }
1971         read_unlock_bh(&tipc_net_lock);
1972 }
1973
1974 /*
1975  * link_defer_buf(): Sort a received out-of-sequence packet
1976  *                   into the deferred reception queue.
1977  * Returns the increase of the queue length,i.e. 0 or 1
1978  */
1979
1980 u32 tipc_link_defer_pkt(struct sk_buff **head,
1981                         struct sk_buff **tail,
1982                         struct sk_buff *buf)
1983 {
1984         struct sk_buff *prev = NULL;
1985         struct sk_buff *crs = *head;
1986         u32 seq_no = msg_seqno(buf_msg(buf));
1987
1988         buf->next = NULL;
1989
1990         /* Empty queue ? */
1991         if (*head == NULL) {
1992                 *head = *tail = buf;
1993                 return 1;
1994         }
1995
1996         /* Last ? */
1997         if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1998                 (*tail)->next = buf;
1999                 *tail = buf;
2000                 return 1;
2001         }
2002
2003         /* Scan through queue and sort it in */
2004         do {
2005                 struct tipc_msg *msg = buf_msg(crs);
2006
2007                 if (less(seq_no, msg_seqno(msg))) {
2008                         buf->next = crs;
2009                         if (prev)
2010                                 prev->next = buf;
2011                         else
2012                                 *head = buf;
2013                         return 1;
2014                 }
2015                 if (seq_no == msg_seqno(msg)) {
2016                         break;
2017                 }
2018                 prev = crs;
2019                 crs = crs->next;
2020         }
2021         while (crs);
2022
2023         /* Message is a duplicate of an existing message */
2024
2025         buf_discard(buf);
2026         return 0;
2027 }
2028
2029 /**
2030  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
2031  */
2032
2033 static void link_handle_out_of_seq_msg(struct link *l_ptr,
2034                                        struct sk_buff *buf)
2035 {
2036         u32 seq_no = msg_seqno(buf_msg(buf));
2037
2038         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
2039                 link_recv_proto_msg(l_ptr, buf);
2040                 return;
2041         }
2042
2043         dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
2044             seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
2045
2046         /* Record OOS packet arrival (force mismatch on next timeout) */
2047
2048         l_ptr->checkpoint--;
2049
2050         /*
2051          * Discard packet if a duplicate; otherwise add it to deferred queue
2052          * and notify peer of gap as per protocol specification
2053          */
2054
2055         if (less(seq_no, mod(l_ptr->next_in_no))) {
2056                 l_ptr->stats.duplicates++;
2057                 buf_discard(buf);
2058                 return;
2059         }
2060
2061         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
2062                                 &l_ptr->newest_deferred_in, buf)) {
2063                 l_ptr->deferred_inqueue_sz++;
2064                 l_ptr->stats.deferred_recv++;
2065                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2066                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2067         } else
2068                 l_ptr->stats.duplicates++;
2069 }
2070
2071 /*
2072  * Send protocol message to the other endpoint.
2073  */
2074 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2075                               u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2076 {
2077         struct sk_buff *buf = NULL;
2078         struct tipc_msg *msg = l_ptr->pmsg;
2079         u32 msg_size = sizeof(l_ptr->proto_msg);
2080
2081         if (link_blocked(l_ptr))
2082                 return;
2083         msg_set_type(msg, msg_typ);
2084         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2085         msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2086         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2087
2088         if (msg_typ == STATE_MSG) {
2089                 u32 next_sent = mod(l_ptr->next_out_no);
2090
2091                 if (!tipc_link_is_up(l_ptr))
2092                         return;
2093                 if (l_ptr->next_out)
2094                         next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2095                 msg_set_next_sent(msg, next_sent);
2096                 if (l_ptr->oldest_deferred_in) {
2097                         u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2098                         gap = mod(rec - mod(l_ptr->next_in_no));
2099                 }
2100                 msg_set_seq_gap(msg, gap);
2101                 if (gap)
2102                         l_ptr->stats.sent_nacks++;
2103                 msg_set_link_tolerance(msg, tolerance);
2104                 msg_set_linkprio(msg, priority);
2105                 msg_set_max_pkt(msg, ack_mtu);
2106                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2107                 msg_set_probe(msg, probe_msg != 0);
2108                 if (probe_msg) {
2109                         u32 mtu = l_ptr->max_pkt;
2110
2111                         if ((mtu < l_ptr->max_pkt_target) &&
2112                             link_working_working(l_ptr) &&
2113                             l_ptr->fsm_msg_cnt) {
2114                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2115                                 if (l_ptr->max_pkt_probes == 10) {
2116                                         l_ptr->max_pkt_target = (msg_size - 4);
2117                                         l_ptr->max_pkt_probes = 0;
2118                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2119                                 }
2120                                 l_ptr->max_pkt_probes++;
2121                         }
2122
2123                         l_ptr->stats.sent_probes++;
2124                 }
2125                 l_ptr->stats.sent_states++;
2126         } else {                /* RESET_MSG or ACTIVATE_MSG */
2127                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2128                 msg_set_seq_gap(msg, 0);
2129                 msg_set_next_sent(msg, 1);
2130                 msg_set_link_tolerance(msg, l_ptr->tolerance);
2131                 msg_set_linkprio(msg, l_ptr->priority);
2132                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2133         }
2134
2135         if (tipc_node_has_redundant_links(l_ptr->owner)) {
2136                 msg_set_redundant_link(msg);
2137         } else {
2138                 msg_clear_redundant_link(msg);
2139         }
2140         msg_set_linkprio(msg, l_ptr->priority);
2141
2142         /* Ensure sequence number will not fit : */
2143
2144         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2145
2146         /* Congestion? */
2147
2148         if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2149                 if (!l_ptr->proto_msg_queue) {
2150                         l_ptr->proto_msg_queue =
2151                                 tipc_buf_acquire(sizeof(l_ptr->proto_msg));
2152                 }
2153                 buf = l_ptr->proto_msg_queue;
2154                 if (!buf)
2155                         return;
2156                 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2157                 return;
2158         }
2159         msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2160
2161         /* Message can be sent */
2162
2163         msg_dbg(msg, ">>");
2164
2165         buf = tipc_buf_acquire(msg_size);
2166         if (!buf)
2167                 return;
2168
2169         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2170         msg_set_size(buf_msg(buf), msg_size);
2171
2172         if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2173                 l_ptr->unacked_window = 0;
2174                 buf_discard(buf);
2175                 return;
2176         }
2177
2178         /* New congestion */
2179         tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2180         l_ptr->proto_msg_queue = buf;
2181         l_ptr->stats.bearer_congs++;
2182 }
2183
2184 /*
2185  * Receive protocol message :
2186  * Note that network plane id propagates through the network, and may
2187  * change at any time. The node with lowest address rules
2188  */
2189
2190 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2191 {
2192         u32 rec_gap = 0;
2193         u32 max_pkt_info;
2194         u32 max_pkt_ack;
2195         u32 msg_tol;
2196         struct tipc_msg *msg = buf_msg(buf);
2197
2198         dbg("AT(%u):", jiffies_to_msecs(jiffies));
2199         msg_dbg(msg, "<<");
2200         if (link_blocked(l_ptr))
2201                 goto exit;
2202
2203         /* record unnumbered packet arrival (force mismatch on next timeout) */
2204
2205         l_ptr->checkpoint--;
2206
2207         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2208                 if (tipc_own_addr > msg_prevnode(msg))
2209                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2210
2211         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2212
2213         switch (msg_type(msg)) {
2214
2215         case RESET_MSG:
2216                 if (!link_working_unknown(l_ptr) &&
2217                     (l_ptr->peer_session != INVALID_SESSION)) {
2218                         if (msg_session(msg) == l_ptr->peer_session) {
2219                                 dbg("Duplicate RESET: %u<->%u\n",
2220                                     msg_session(msg), l_ptr->peer_session);
2221                                 break; /* duplicate: ignore */
2222                         }
2223                 }
2224                 /* fall thru' */
2225         case ACTIVATE_MSG:
2226                 /* Update link settings according other endpoint's values */
2227
2228                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2229
2230                 if ((msg_tol = msg_link_tolerance(msg)) &&
2231                     (msg_tol > l_ptr->tolerance))
2232                         link_set_supervision_props(l_ptr, msg_tol);
2233
2234                 if (msg_linkprio(msg) > l_ptr->priority)
2235                         l_ptr->priority = msg_linkprio(msg);
2236
2237                 max_pkt_info = msg_max_pkt(msg);
2238                 if (max_pkt_info) {
2239                         if (max_pkt_info < l_ptr->max_pkt_target)
2240                                 l_ptr->max_pkt_target = max_pkt_info;
2241                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2242                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
2243                 } else {
2244                         l_ptr->max_pkt = l_ptr->max_pkt_target;
2245                 }
2246                 l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2247
2248                 link_state_event(l_ptr, msg_type(msg));
2249
2250                 l_ptr->peer_session = msg_session(msg);
2251                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2252
2253                 /* Synchronize broadcast sequence numbers */
2254                 if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2255                         l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2256                 }
2257                 break;
2258         case STATE_MSG:
2259
2260                 if ((msg_tol = msg_link_tolerance(msg)))
2261                         link_set_supervision_props(l_ptr, msg_tol);
2262
2263                 if (msg_linkprio(msg) &&
2264                     (msg_linkprio(msg) != l_ptr->priority)) {
2265                         warn("Resetting link <%s>, priority change %u->%u\n",
2266                              l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2267                         l_ptr->priority = msg_linkprio(msg);
2268                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
2269                         break;
2270                 }
2271                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2272                 l_ptr->stats.recv_states++;
2273                 if (link_reset_unknown(l_ptr))
2274                         break;
2275
2276                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2277                         rec_gap = mod(msg_next_sent(msg) -
2278                                       mod(l_ptr->next_in_no));
2279                 }
2280
2281                 max_pkt_ack = msg_max_pkt(msg);
2282                 if (max_pkt_ack > l_ptr->max_pkt) {
2283                         dbg("Link <%s> updated MTU %u -> %u\n",
2284                             l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2285                         l_ptr->max_pkt = max_pkt_ack;
2286                         l_ptr->max_pkt_probes = 0;
2287                 }
2288
2289                 max_pkt_ack = 0;
2290                 if (msg_probe(msg)) {
2291                         l_ptr->stats.recv_probes++;
2292                         if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2293                                 max_pkt_ack = msg_size(msg);
2294                         }
2295                 }
2296
2297                 /* Protocol message before retransmits, reduce loss risk */
2298
2299                 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2300
2301                 if (rec_gap || (msg_probe(msg))) {
2302                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2303                                                  0, rec_gap, 0, 0, max_pkt_ack);
2304                 }
2305                 if (msg_seq_gap(msg)) {
2306                         msg_dbg(msg, "With Gap:");
2307                         l_ptr->stats.recv_nacks++;
2308                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
2309                                              msg_seq_gap(msg));
2310                 }
2311                 break;
2312         default:
2313                 msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2314         }
2315 exit:
2316         buf_discard(buf);
2317 }
2318
2319
2320 /*
2321  * tipc_link_tunnel(): Send one message via a link belonging to
2322  * another bearer. Owner node is locked.
2323  */
2324 static void tipc_link_tunnel(struct link *l_ptr,
2325                              struct tipc_msg *tunnel_hdr,
2326                              struct tipc_msg  *msg,
2327                              u32 selector)
2328 {
2329         struct link *tunnel;
2330         struct sk_buff *buf;
2331         u32 length = msg_size(msg);
2332
2333         tunnel = l_ptr->owner->active_links[selector & 1];
2334         if (!tipc_link_is_up(tunnel)) {
2335                 warn("Link changeover error, "
2336                      "tunnel link no longer available\n");
2337                 return;
2338         }
2339         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2340         buf = tipc_buf_acquire(length + INT_H_SIZE);
2341         if (!buf) {
2342                 warn("Link changeover error, "
2343                      "unable to send tunnel msg\n");
2344                 return;
2345         }
2346         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2347         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2348         dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2349         msg_dbg(buf_msg(buf), ">SEND>");
2350         tipc_link_send_buf(tunnel, buf);
2351 }
2352
2353
2354
2355 /*
2356  * changeover(): Send whole message queue via the remaining link
2357  *               Owner node is locked.
2358  */
2359
2360 void tipc_link_changeover(struct link *l_ptr)
2361 {
2362         u32 msgcount = l_ptr->out_queue_size;
2363         struct sk_buff *crs = l_ptr->first_out;
2364         struct link *tunnel = l_ptr->owner->active_links[0];
2365         struct tipc_msg tunnel_hdr;
2366         int split_bundles;
2367
2368         if (!tunnel)
2369                 return;
2370
2371         if (!l_ptr->owner->permit_changeover) {
2372                 warn("Link changeover error, "
2373                      "peer did not permit changeover\n");
2374                 return;
2375         }
2376
2377         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2378                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2379         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2380         msg_set_msgcnt(&tunnel_hdr, msgcount);
2381         dbg("Link changeover requires %u tunnel messages\n", msgcount);
2382
2383         if (!l_ptr->first_out) {
2384                 struct sk_buff *buf;
2385
2386                 buf = tipc_buf_acquire(INT_H_SIZE);
2387                 if (buf) {
2388                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2389                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2390                         dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2391                             tunnel->b_ptr->net_plane);
2392                         msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2393                         tipc_link_send_buf(tunnel, buf);
2394                 } else {
2395                         warn("Link changeover error, "
2396                              "unable to send changeover msg\n");
2397                 }
2398                 return;
2399         }
2400
2401         split_bundles = (l_ptr->owner->active_links[0] !=
2402                          l_ptr->owner->active_links[1]);
2403
2404         while (crs) {
2405                 struct tipc_msg *msg = buf_msg(crs);
2406
2407                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2408                         struct tipc_msg *m = msg_get_wrapped(msg);
2409                         unchar* pos = (unchar*)m;
2410
2411                         msgcount = msg_msgcnt(msg);
2412                         while (msgcount--) {
2413                                 msg_set_seqno(m,msg_seqno(msg));
2414                                 tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2415                                                  msg_link_selector(m));
2416                                 pos += align(msg_size(m));
2417                                 m = (struct tipc_msg *)pos;
2418                         }
2419                 } else {
2420                         tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2421                                          msg_link_selector(msg));
2422                 }
2423                 crs = crs->next;
2424         }
2425 }
2426
2427 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2428 {
2429         struct sk_buff *iter;
2430         struct tipc_msg tunnel_hdr;
2431
2432         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2433                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2434         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2435         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2436         iter = l_ptr->first_out;
2437         while (iter) {
2438                 struct sk_buff *outbuf;
2439                 struct tipc_msg *msg = buf_msg(iter);
2440                 u32 length = msg_size(msg);
2441
2442                 if (msg_user(msg) == MSG_BUNDLER)
2443                         msg_set_type(msg, CLOSED_MSG);
2444                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2445                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2446                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2447                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2448                 if (outbuf == NULL) {
2449                         warn("Link changeover error, "
2450                              "unable to send duplicate msg\n");
2451                         return;
2452                 }
2453                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2454                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2455                                                length);
2456                 dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2457                     tunnel->b_ptr->net_plane);
2458                 msg_dbg(buf_msg(outbuf), ">SEND>");
2459                 tipc_link_send_buf(tunnel, outbuf);
2460                 if (!tipc_link_is_up(l_ptr))
2461                         return;
2462                 iter = iter->next;
2463         }
2464 }
2465
2466
2467
2468 /**
2469  * buf_extract - extracts embedded TIPC message from another message
2470  * @skb: encapsulating message buffer
2471  * @from_pos: offset to extract from
2472  *
2473  * Returns a new message buffer containing an embedded message.  The
2474  * encapsulating message itself is left unchanged.
2475  */
2476
2477 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2478 {
2479         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2480         u32 size = msg_size(msg);
2481         struct sk_buff *eb;
2482
2483         eb = tipc_buf_acquire(size);
2484         if (eb)
2485                 skb_copy_to_linear_data(eb, msg, size);
2486         return eb;
2487 }
2488
2489 /*
2490  *  link_recv_changeover_msg(): Receive tunneled packet sent
2491  *  via other link. Node is locked. Return extracted buffer.
2492  */
2493
2494 static int link_recv_changeover_msg(struct link **l_ptr,
2495                                     struct sk_buff **buf)
2496 {
2497         struct sk_buff *tunnel_buf = *buf;
2498         struct link *dest_link;
2499         struct tipc_msg *msg;
2500         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2501         u32 msg_typ = msg_type(tunnel_msg);
2502         u32 msg_count = msg_msgcnt(tunnel_msg);
2503
2504         dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2505         if (!dest_link) {
2506                 msg_dbg(tunnel_msg, "NOLINK/<REC<");
2507                 goto exit;
2508         }
2509         if (dest_link == *l_ptr) {
2510                 err("Unexpected changeover message on link <%s>\n",
2511                     (*l_ptr)->name);
2512                 goto exit;
2513         }
2514         dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2515             (*l_ptr)->b_ptr->net_plane);
2516         *l_ptr = dest_link;
2517         msg = msg_get_wrapped(tunnel_msg);
2518
2519         if (msg_typ == DUPLICATE_MSG) {
2520                 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2521                         msg_dbg(tunnel_msg, "DROP/<REC<");
2522                         goto exit;
2523                 }
2524                 *buf = buf_extract(tunnel_buf,INT_H_SIZE);
2525                 if (*buf == NULL) {
2526                         warn("Link changeover error, duplicate msg dropped\n");
2527                         goto exit;
2528                 }
2529                 msg_dbg(tunnel_msg, "TNL<REC<");
2530                 buf_discard(tunnel_buf);
2531                 return 1;
2532         }
2533
2534         /* First original message ?: */
2535
2536         if (tipc_link_is_up(dest_link)) {
2537                 msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2538                 info("Resetting link <%s>, changeover initiated by peer\n",
2539                      dest_link->name);
2540                 tipc_link_reset(dest_link);
2541                 dest_link->exp_msg_count = msg_count;
2542                 dbg("Expecting %u tunnelled messages\n", msg_count);
2543                 if (!msg_count)
2544                         goto exit;
2545         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2546                 msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2547                 dest_link->exp_msg_count = msg_count;
2548                 dbg("Expecting %u tunnelled messages\n", msg_count);
2549                 if (!msg_count)
2550                         goto exit;
2551         }
2552
2553         /* Receive original message */
2554
2555         if (dest_link->exp_msg_count == 0) {
2556                 warn("Link switchover error, "
2557                      "got too many tunnelled messages\n");
2558                 msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2559                 dbg_print_link(dest_link, "LINK:");
2560                 goto exit;
2561         }
2562         dest_link->exp_msg_count--;
2563         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2564                 msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2565                 goto exit;
2566         } else {
2567                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2568                 if (*buf != NULL) {
2569                         msg_dbg(tunnel_msg, "TNL<REC<");
2570                         buf_discard(tunnel_buf);
2571                         return 1;
2572                 } else {
2573                         warn("Link changeover error, original msg dropped\n");
2574                 }
2575         }
2576 exit:
2577         *buf = NULL;
2578         buf_discard(tunnel_buf);
2579         return 0;
2580 }
2581
2582 /*
2583  *  Bundler functionality:
2584  */
2585 void tipc_link_recv_bundle(struct sk_buff *buf)
2586 {
2587         u32 msgcount = msg_msgcnt(buf_msg(buf));
2588         u32 pos = INT_H_SIZE;
2589         struct sk_buff *obuf;
2590
2591         msg_dbg(buf_msg(buf), "<BNDL<: ");
2592         while (msgcount--) {
2593                 obuf = buf_extract(buf, pos);
2594                 if (obuf == NULL) {
2595                         warn("Link unable to unbundle message(s)\n");
2596                         break;
2597                 }
2598                 pos += align(msg_size(buf_msg(obuf)));
2599                 msg_dbg(buf_msg(obuf), "     /");
2600                 tipc_net_route_msg(obuf);
2601         }
2602         buf_discard(buf);
2603 }
2604
2605 /*
2606  *  Fragmentation/defragmentation:
2607  */
2608
2609
2610 /*
2611  * link_send_long_buf: Entry for buffers needing fragmentation.
2612  * The buffer is complete, inclusive total message length.
2613  * Returns user data length.
2614  */
2615 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2616 {
2617         struct tipc_msg *inmsg = buf_msg(buf);
2618         struct tipc_msg fragm_hdr;
2619         u32 insize = msg_size(inmsg);
2620         u32 dsz = msg_data_sz(inmsg);
2621         unchar *crs = buf->data;
2622         u32 rest = insize;
2623         u32 pack_sz = l_ptr->max_pkt;
2624         u32 fragm_sz = pack_sz - INT_H_SIZE;
2625         u32 fragm_no = 1;
2626         u32 destaddr;
2627
2628         if (msg_short(inmsg))
2629                 destaddr = l_ptr->addr;
2630         else
2631                 destaddr = msg_destnode(inmsg);
2632
2633         if (msg_routed(inmsg))
2634                 msg_set_prevnode(inmsg, tipc_own_addr);
2635
2636         /* Prepare reusable fragment header: */
2637
2638         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2639                  INT_H_SIZE, destaddr);
2640         msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2641         msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2642         msg_set_fragm_no(&fragm_hdr, fragm_no);
2643         l_ptr->stats.sent_fragmented++;
2644
2645         /* Chop up message: */
2646
2647         while (rest > 0) {
2648                 struct sk_buff *fragm;
2649
2650                 if (rest <= fragm_sz) {
2651                         fragm_sz = rest;
2652                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2653                 }
2654                 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2655                 if (fragm == NULL) {
2656                         warn("Link unable to fragment message\n");
2657                         dsz = -ENOMEM;
2658                         goto exit;
2659                 }
2660                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2661                 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2662                 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2663                                                fragm_sz);
2664                 /*  Send queued messages first, if any: */
2665
2666                 l_ptr->stats.sent_fragments++;
2667                 tipc_link_send_buf(l_ptr, fragm);
2668                 if (!tipc_link_is_up(l_ptr))
2669                         return dsz;
2670                 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2671                 rest -= fragm_sz;
2672                 crs += fragm_sz;
2673                 msg_set_type(&fragm_hdr, FRAGMENT);
2674         }
2675 exit:
2676         buf_discard(buf);
2677         return dsz;
2678 }
2679
2680 /*
2681  * A pending message being re-assembled must store certain values
2682  * to handle subsequent fragments correctly. The following functions
2683  * help storing these values in unused, available fields in the
2684  * pending message. This makes dynamic memory allocation unecessary.
2685  */
2686
2687 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2688 {
2689         msg_set_seqno(buf_msg(buf), seqno);
2690 }
2691
2692 static u32 get_fragm_size(struct sk_buff *buf)
2693 {
2694         return msg_ack(buf_msg(buf));
2695 }
2696
2697 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2698 {
2699         msg_set_ack(buf_msg(buf), sz);
2700 }
2701
2702 static u32 get_expected_frags(struct sk_buff *buf)
2703 {
2704         return msg_bcast_ack(buf_msg(buf));
2705 }
2706
2707 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2708 {
2709         msg_set_bcast_ack(buf_msg(buf), exp);
2710 }
2711
2712 static u32 get_timer_cnt(struct sk_buff *buf)
2713 {
2714         return msg_reroute_cnt(buf_msg(buf));
2715 }
2716
2717 static void incr_timer_cnt(struct sk_buff *buf)
2718 {
2719         msg_incr_reroute_cnt(buf_msg(buf));
2720 }
2721
2722 /*
2723  * tipc_link_recv_fragment(): Called with node lock on. Returns
2724  * the reassembled buffer if message is complete.
2725  */
2726 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2727                             struct tipc_msg **m)
2728 {
2729         struct sk_buff *prev = NULL;
2730         struct sk_buff *fbuf = *fb;
2731         struct tipc_msg *fragm = buf_msg(fbuf);
2732         struct sk_buff *pbuf = *pending;
2733         u32 long_msg_seq_no = msg_long_msgno(fragm);
2734
2735         *fb = NULL;
2736         msg_dbg(fragm,"FRG<REC<");
2737
2738         /* Is there an incomplete message waiting for this fragment? */
2739
2740         while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2741                         (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2742                 prev = pbuf;
2743                 pbuf = pbuf->next;
2744         }
2745
2746         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2747                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2748                 u32 msg_sz = msg_size(imsg);
2749                 u32 fragm_sz = msg_data_sz(fragm);
2750                 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2751                 u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2752                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2753                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2754                 if (msg_size(imsg) > max) {
2755                         msg_dbg(fragm,"<REC<Oversized: ");
2756                         buf_discard(fbuf);
2757                         return 0;
2758                 }
2759                 pbuf = tipc_buf_acquire(msg_size(imsg));
2760                 if (pbuf != NULL) {
2761                         pbuf->next = *pending;
2762                         *pending = pbuf;
2763                         skb_copy_to_linear_data(pbuf, imsg,
2764                                                 msg_data_sz(fragm));
2765                         /*  Prepare buffer for subsequent fragments. */
2766
2767                         set_long_msg_seqno(pbuf, long_msg_seq_no);
2768                         set_fragm_size(pbuf,fragm_sz);
2769                         set_expected_frags(pbuf,exp_fragm_cnt - 1);
2770                 } else {
2771                         warn("Link unable to reassemble fragmented message\n");
2772                 }
2773                 buf_discard(fbuf);
2774                 return 0;
2775         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2776                 u32 dsz = msg_data_sz(fragm);
2777                 u32 fsz = get_fragm_size(pbuf);
2778                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2779                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2780                 skb_copy_to_linear_data_offset(pbuf, crs,
2781                                                msg_data(fragm), dsz);
2782                 buf_discard(fbuf);
2783
2784                 /* Is message complete? */
2785
2786                 if (exp_frags == 0) {
2787                         if (prev)
2788                                 prev->next = pbuf->next;
2789                         else
2790                                 *pending = pbuf->next;
2791                         msg_reset_reroute_cnt(buf_msg(pbuf));
2792                         *fb = pbuf;
2793                         *m = buf_msg(pbuf);
2794                         return 1;
2795                 }
2796                 set_expected_frags(pbuf,exp_frags);
2797                 return 0;
2798         }
2799         dbg(" Discarding orphan fragment %x\n",fbuf);
2800         msg_dbg(fragm,"ORPHAN:");
2801         dbg("Pending long buffers:\n");
2802         dbg_print_buf_chain(*pending);
2803         buf_discard(fbuf);
2804         return 0;
2805 }
2806
2807 /**
2808  * link_check_defragm_bufs - flush stale incoming message fragments
2809  * @l_ptr: pointer to link
2810  */
2811
2812 static void link_check_defragm_bufs(struct link *l_ptr)
2813 {
2814         struct sk_buff *prev = NULL;
2815         struct sk_buff *next = NULL;
2816         struct sk_buff *buf = l_ptr->defragm_buf;
2817
2818         if (!buf)
2819                 return;
2820         if (!link_working_working(l_ptr))
2821                 return;
2822         while (buf) {
2823                 u32 cnt = get_timer_cnt(buf);
2824
2825                 next = buf->next;
2826                 if (cnt < 4) {
2827                         incr_timer_cnt(buf);
2828                         prev = buf;
2829                 } else {
2830                         dbg(" Discarding incomplete long buffer\n");
2831                         msg_dbg(buf_msg(buf), "LONG:");
2832                         dbg_print_link(l_ptr, "curr:");
2833                         dbg("Pending long buffers:\n");
2834                         dbg_print_buf_chain(l_ptr->defragm_buf);
2835                         if (prev)
2836                                 prev->next = buf->next;
2837                         else
2838                                 l_ptr->defragm_buf = buf->next;
2839                         buf_discard(buf);
2840                 }
2841                 buf = next;
2842         }
2843 }
2844
2845
2846
2847 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2848 {
2849         l_ptr->tolerance = tolerance;
2850         l_ptr->continuity_interval =
2851                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2852         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2853 }
2854
2855
2856 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2857 {
2858         /* Data messages from this node, inclusive FIRST_FRAGM */
2859         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2860         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2861         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2862         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2863         /* Transiting data messages,inclusive FIRST_FRAGM */
2864         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2865         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2866         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2867         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2868         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2869         l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2870         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2871         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2872         /* FRAGMENT and LAST_FRAGMENT packets */
2873         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2874 }
2875
2876 /**
2877  * link_find_link - locate link by name
2878  * @name - ptr to link name string
2879  * @node - ptr to area to be filled with ptr to associated node
2880  *
2881  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2882  * this also prevents link deletion.
2883  *
2884  * Returns pointer to link (or 0 if invalid link name).
2885  */
2886
2887 static struct link *link_find_link(const char *name, struct tipc_node **node)
2888 {
2889         struct link_name link_name_parts;
2890         struct bearer *b_ptr;
2891         struct link *l_ptr;
2892
2893         if (!link_name_validate(name, &link_name_parts))
2894                 return NULL;
2895
2896         b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2897         if (!b_ptr)
2898                 return NULL;
2899
2900         *node = tipc_node_find(link_name_parts.addr_peer);
2901         if (!*node)
2902                 return NULL;
2903
2904         l_ptr = (*node)->links[b_ptr->identity];
2905         if (!l_ptr || strcmp(l_ptr->name, name))
2906                 return NULL;
2907
2908         return l_ptr;
2909 }
2910
2911 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2912                                      u16 cmd)
2913 {
2914         struct tipc_link_config *args;
2915         u32 new_value;
2916         struct link *l_ptr;
2917         struct tipc_node *node;
2918         int res;
2919
2920         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2921                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2922
2923         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2924         new_value = ntohl(args->value);
2925
2926         if (!strcmp(args->name, tipc_bclink_name)) {
2927                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2928                     (tipc_bclink_set_queue_limits(new_value) == 0))
2929                         return tipc_cfg_reply_none();
2930                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2931                                                    " (cannot change setting on broadcast link)");
2932         }
2933
2934         read_lock_bh(&tipc_net_lock);
2935         l_ptr = link_find_link(args->name, &node);
2936         if (!l_ptr) {
2937                 read_unlock_bh(&tipc_net_lock);
2938                 return tipc_cfg_reply_error_string("link not found");
2939         }
2940
2941         tipc_node_lock(node);
2942         res = -EINVAL;
2943         switch (cmd) {
2944         case TIPC_CMD_SET_LINK_TOL:
2945                 if ((new_value >= TIPC_MIN_LINK_TOL) &&
2946                     (new_value <= TIPC_MAX_LINK_TOL)) {
2947                         link_set_supervision_props(l_ptr, new_value);
2948                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2949                                                  0, 0, new_value, 0, 0);
2950                         res = 0;
2951                 }
2952                 break;
2953         case TIPC_CMD_SET_LINK_PRI:
2954                 if ((new_value >= TIPC_MIN_LINK_PRI) &&
2955                     (new_value <= TIPC_MAX_LINK_PRI)) {
2956                         l_ptr->priority = new_value;
2957                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2958                                                  0, 0, 0, new_value, 0);
2959                         res = 0;
2960                 }
2961                 break;
2962         case TIPC_CMD_SET_LINK_WINDOW:
2963                 if ((new_value >= TIPC_MIN_LINK_WIN) &&
2964                     (new_value <= TIPC_MAX_LINK_WIN)) {
2965                         tipc_link_set_queue_limits(l_ptr, new_value);
2966                         res = 0;
2967                 }
2968                 break;
2969         }
2970         tipc_node_unlock(node);
2971
2972         read_unlock_bh(&tipc_net_lock);
2973         if (res)
2974                 return tipc_cfg_reply_error_string("cannot change link setting");
2975
2976         return tipc_cfg_reply_none();
2977 }
2978
2979 /**
2980  * link_reset_statistics - reset link statistics
2981  * @l_ptr: pointer to link
2982  */
2983
2984 static void link_reset_statistics(struct link *l_ptr)
2985 {
2986         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2987         l_ptr->stats.sent_info = l_ptr->next_out_no;
2988         l_ptr->stats.recv_info = l_ptr->next_in_no;
2989 }
2990
2991 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2992 {
2993         char *link_name;
2994         struct link *l_ptr;
2995         struct tipc_node *node;
2996
2997         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2998                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2999
3000         link_name = (char *)TLV_DATA(req_tlv_area);
3001         if (!strcmp(link_name, tipc_bclink_name)) {
3002                 if (tipc_bclink_reset_stats())
3003                         return tipc_cfg_reply_error_string("link not found");
3004                 return tipc_cfg_reply_none();
3005         }
3006
3007         read_lock_bh(&tipc_net_lock);
3008         l_ptr = link_find_link(link_name, &node);
3009         if (!l_ptr) {
3010                 read_unlock_bh(&tipc_net_lock);
3011                 return tipc_cfg_reply_error_string("link not found");
3012         }
3013
3014         tipc_node_lock(node);
3015         link_reset_statistics(l_ptr);
3016         tipc_node_unlock(node);
3017         read_unlock_bh(&tipc_net_lock);
3018         return tipc_cfg_reply_none();
3019 }
3020
3021 /**
3022  * percent - convert count to a percentage of total (rounding up or down)
3023  */
3024
3025 static u32 percent(u32 count, u32 total)
3026 {
3027         return (count * 100 + (total / 2)) / total;
3028 }
3029
3030 /**
3031  * tipc_link_stats - print link statistics
3032  * @name: link name
3033  * @buf: print buffer area
3034  * @buf_size: size of print buffer area
3035  *
3036  * Returns length of print buffer data string (or 0 if error)
3037  */
3038
3039 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
3040 {
3041         struct print_buf pb;
3042         struct link *l_ptr;
3043         struct tipc_node *node;
3044         char *status;
3045         u32 profile_total = 0;
3046
3047         if (!strcmp(name, tipc_bclink_name))
3048                 return tipc_bclink_stats(buf, buf_size);
3049
3050         tipc_printbuf_init(&pb, buf, buf_size);
3051
3052         read_lock_bh(&tipc_net_lock);
3053         l_ptr = link_find_link(name, &node);
3054         if (!l_ptr) {
3055                 read_unlock_bh(&tipc_net_lock);
3056                 return 0;
3057         }
3058         tipc_node_lock(node);
3059
3060         if (tipc_link_is_active(l_ptr))
3061                 status = "ACTIVE";
3062         else if (tipc_link_is_up(l_ptr))
3063                 status = "STANDBY";
3064         else
3065                 status = "DEFUNCT";
3066         tipc_printf(&pb, "Link <%s>\n"
3067                          "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
3068                          "  Window:%u packets\n",
3069                     l_ptr->name, status, l_ptr->max_pkt,
3070                     l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
3071         tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
3072                     l_ptr->next_in_no - l_ptr->stats.recv_info,
3073                     l_ptr->stats.recv_fragments,
3074                     l_ptr->stats.recv_fragmented,
3075                     l_ptr->stats.recv_bundles,
3076                     l_ptr->stats.recv_bundled);
3077         tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3078                     l_ptr->next_out_no - l_ptr->stats.sent_info,
3079                     l_ptr->stats.sent_fragments,
3080                     l_ptr->stats.sent_fragmented,
3081                     l_ptr->stats.sent_bundles,
3082                     l_ptr->stats.sent_bundled);
3083         profile_total = l_ptr->stats.msg_length_counts;
3084         if (!profile_total)
3085                 profile_total = 1;
3086         tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3087                          "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3088                          "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3089                     l_ptr->stats.msg_length_counts,
3090                     l_ptr->stats.msg_lengths_total / profile_total,
3091                     percent(l_ptr->stats.msg_length_profile[0], profile_total),
3092                     percent(l_ptr->stats.msg_length_profile[1], profile_total),
3093                     percent(l_ptr->stats.msg_length_profile[2], profile_total),
3094                     percent(l_ptr->stats.msg_length_profile[3], profile_total),
3095                     percent(l_ptr->stats.msg_length_profile[4], profile_total),
3096                     percent(l_ptr->stats.msg_length_profile[5], profile_total),
3097                     percent(l_ptr->stats.msg_length_profile[6], profile_total));
3098         tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3099                     l_ptr->stats.recv_states,
3100                     l_ptr->stats.recv_probes,
3101                     l_ptr->stats.recv_nacks,
3102                     l_ptr->stats.deferred_recv,
3103                     l_ptr->stats.duplicates);
3104         tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3105                     l_ptr->stats.sent_states,
3106                     l_ptr->stats.sent_probes,
3107                     l_ptr->stats.sent_nacks,
3108                     l_ptr->stats.sent_acks,
3109                     l_ptr->stats.retransmitted);
3110         tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3111                     l_ptr->stats.bearer_congs,
3112                     l_ptr->stats.link_congs,
3113                     l_ptr->stats.max_queue_sz,
3114                     l_ptr->stats.queue_sz_counts
3115                     ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3116                     : 0);
3117
3118         tipc_node_unlock(node);
3119         read_unlock_bh(&tipc_net_lock);
3120         return tipc_printbuf_validate(&pb);
3121 }
3122
3123 #define MAX_LINK_STATS_INFO 2000
3124
3125 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3126 {
3127         struct sk_buff *buf;
3128         struct tlv_desc *rep_tlv;
3129         int str_len;
3130
3131         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3132                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3133
3134         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3135         if (!buf)
3136                 return NULL;
3137
3138         rep_tlv = (struct tlv_desc *)buf->data;
3139
3140         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3141                                   (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3142         if (!str_len) {
3143                 buf_discard(buf);
3144                 return tipc_cfg_reply_error_string("link not found");
3145         }
3146
3147         skb_put(buf, TLV_SPACE(str_len));
3148         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3149
3150         return buf;
3151 }
3152
3153 /**
3154  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3155  * @dest: network address of destination node
3156  * @selector: used to select from set of active links
3157  *
3158  * If no active link can be found, uses default maximum packet size.
3159  */
3160
3161 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3162 {
3163         struct tipc_node *n_ptr;
3164         struct link *l_ptr;
3165         u32 res = MAX_PKT_DEFAULT;
3166
3167         if (dest == tipc_own_addr)
3168                 return MAX_MSG_SIZE;
3169
3170         read_lock_bh(&tipc_net_lock);
3171         n_ptr = tipc_node_select(dest, selector);
3172         if (n_ptr) {
3173                 tipc_node_lock(n_ptr);
3174                 l_ptr = n_ptr->active_links[selector & 1];
3175                 if (l_ptr)
3176                         res = l_ptr->max_pkt;
3177                 tipc_node_unlock(n_ptr);
3178         }
3179         read_unlock_bh(&tipc_net_lock);
3180         return res;
3181 }
3182
3183 static void link_dump_send_queue(struct link *l_ptr)
3184 {
3185         if (l_ptr->next_out) {
3186                 info("\nContents of unsent queue:\n");
3187                 dbg_print_buf_chain(l_ptr->next_out);
3188         }
3189         info("\nContents of send queue:\n");
3190         if (l_ptr->first_out) {
3191                 dbg_print_buf_chain(l_ptr->first_out);
3192         }
3193         info("Empty send queue\n");
3194 }
3195
3196 static void link_print(struct link *l_ptr, struct print_buf *buf,
3197                        const char *str)
3198 {
3199         tipc_printf(buf, str);
3200         if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3201                 return;
3202         tipc_printf(buf, "Link %x<%s>:",
3203                     l_ptr->addr, l_ptr->b_ptr->publ.name);
3204         tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3205         tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3206         tipc_printf(buf, "SQUE");
3207         if (l_ptr->first_out) {
3208                 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3209                 if (l_ptr->next_out)
3210                         tipc_printf(buf, "%u..",
3211                                     msg_seqno(buf_msg(l_ptr->next_out)));
3212                 tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
3213                 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3214                          msg_seqno(buf_msg(l_ptr->first_out)))
3215                      != (l_ptr->out_queue_size - 1)) ||
3216                     (l_ptr->last_out->next != NULL)) {
3217                         tipc_printf(buf, "\nSend queue inconsistency\n");
3218                         tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3219                         tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3220                         tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3221                         link_dump_send_queue(l_ptr);
3222                 }
3223         } else
3224                 tipc_printf(buf, "[]");
3225         tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3226         if (l_ptr->oldest_deferred_in) {
3227                 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3228                 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3229                 tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3230                 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3231                         tipc_printf(buf, ":RQSIZ(%u)",
3232                                     l_ptr->deferred_inqueue_sz);
3233                 }
3234         }
3235         if (link_working_unknown(l_ptr))
3236                 tipc_printf(buf, ":WU");
3237         if (link_reset_reset(l_ptr))
3238                 tipc_printf(buf, ":RR");
3239         if (link_reset_unknown(l_ptr))
3240                 tipc_printf(buf, ":RU");
3241         if (link_working_working(l_ptr))
3242                 tipc_printf(buf, ":WW");
3243         tipc_printf(buf, "\n");
3244 }
3245