From dbdf6d24ad37d63938f29a2d134a1a9f6e9e673c Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:40:58 -0400 Subject: [PATCH 1/7] tipc: make name table distributor use new send function In a previous commit series ("tipc: new unicast transmission code") we introduced a new message sending function, tipc_link_xmit2(), and moved the unicast data users over to use that function. We now let the internal name table distributor do the same. The interaction between the name distributor and the node/link layer also becomes significantly simpler, so we can eliminate the function tipc_link_names_xmit(). Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 41 ----------------------- net/tipc/link.h | 1 - net/tipc/name_distr.c | 76 +++++++++++++++++++++++++------------------ net/tipc/name_distr.h | 2 +- net/tipc/node.c | 13 ++------ 5 files changed, 48 insertions(+), 85 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index a235b245f682bb..367b0f5886f8d2 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1032,47 +1032,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) kfree_skb(buf); } -/* - * tipc_link_names_xmit - send name table entries to new neighbor - * - * Send routine for bulk delivery of name table messages when contact - * with a new neighbor occurs. No link congestion checking is performed - * because name table messages *must* be delivered. The messages must be - * small enough not to require fragmentation. - * Called without any locks held. - */ -void tipc_link_names_xmit(struct list_head *message_list, u32 dest) -{ - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct sk_buff *temp_buf; - - if (list_empty(message_list)) - return; - - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[0]; - if (l_ptr) { - /* convert circular list to linear list */ - ((struct sk_buff *)message_list->prev)->next = NULL; - link_add_chain_to_outqueue(l_ptr, - (struct sk_buff *)message_list->next, 0); - tipc_link_push_queue(l_ptr); - INIT_LIST_HEAD(message_list); - } - tipc_node_unlock(n_ptr); - } - - /* discard the messages if they couldn't be sent */ - list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { - list_del((struct list_head *)buf); - kfree_skb(buf); - } -} - /* * tipc_link_push_packet: Push one unsent packet to the media */ diff --git a/net/tipc/link.h b/net/tipc/link.h index 227ff812089739..04a59c58d38514 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -228,7 +228,6 @@ void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector); -void tipc_link_names_xmit(struct list_head *message_list, u32 dest); int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf); int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 8ce730984aa1f0..d16f9475fa765e 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) void named_cluster_distribute(struct sk_buff *buf) { - struct sk_buff *buf_copy; - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; + struct sk_buff *obuf; + struct tipc_node *node; + u32 dnode; rcu_read_lock(); - list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[n_ptr->addr & 1]; - if (l_ptr) { - buf_copy = skb_copy(buf, GFP_ATOMIC); - if (!buf_copy) { - tipc_node_unlock(n_ptr); - break; - } - msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); - __tipc_link_xmit(l_ptr, buf_copy); - } - tipc_node_unlock(n_ptr); + list_for_each_entry_rcu(node, &tipc_node_list, list) { + dnode = node->addr; + if (in_own_node(dnode)) + continue; + if (!tipc_node_active_links(node)) + continue; + obuf = skb_copy(buf, GFP_ATOMIC); + if (!obuf) + break; + msg_set_destnode(buf_msg(obuf), dnode); + tipc_link_xmit2(obuf, dnode, dnode); } rcu_read_unlock(); @@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ) return buf; } -/* +/** * named_distribute - prepare name info for bulk distribution to another node + * @msg_list: list of messages (buffers) to be returned from this function + * @dnode: node to be updated + * @pls: linked list of publication items to be packed into buffer chain */ -static void named_distribute(struct list_head *message_list, u32 node, - struct publ_list *pls, u32 max_item_buf) +static void named_distribute(struct list_head *msg_list, u32 dnode, + struct publ_list *pls) { struct publication *publ; struct sk_buff *buf = NULL; struct distr_item *item = NULL; - u32 left = 0; - u32 rest = pls->size * ITEM_SIZE; + uint dsz = pls->size * ITEM_SIZE; + uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; + uint rem = dsz; + uint msg_rem = 0; list_for_each_entry(publ, &pls->list, local_list) { + /* Prepare next buffer: */ if (!buf) { - left = (rest <= max_item_buf) ? rest : max_item_buf; - rest -= left; - buf = named_prepare_buf(PUBLICATION, left, node); + msg_rem = min_t(uint, rem, msg_dsz); + rem -= msg_rem; + buf = named_prepare_buf(PUBLICATION, msg_rem, dnode); if (!buf) { pr_warn("Bulk publication failure\n"); return; } item = (struct distr_item *)msg_data(buf_msg(buf)); } + + /* Pack publication into message: */ publ_to_item(item, publ); item++; - left -= ITEM_SIZE; - if (!left) { - list_add_tail((struct list_head *)buf, message_list); + msg_rem -= ITEM_SIZE; + + /* Append full buffer to list: */ + if (!msg_rem) { + list_add_tail((struct list_head *)buf, msg_list); buf = NULL; } } @@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node, /** * tipc_named_node_up - tell specified node about all publications by this node */ -void tipc_named_node_up(u32 max_item_buf, u32 node) +void tipc_named_node_up(u32 dnode) { - LIST_HEAD(message_list); + LIST_HEAD(msg_list); + struct sk_buff *buf_chain; read_lock_bh(&tipc_nametbl_lock); - named_distribute(&message_list, node, &publ_cluster, max_item_buf); - named_distribute(&message_list, node, &publ_zone, max_item_buf); + named_distribute(&msg_list, dnode, &publ_cluster); + named_distribute(&msg_list, dnode, &publ_zone); read_unlock_bh(&tipc_nametbl_lock); - tipc_link_names_xmit(&message_list, node); + /* Convert circular list to linear list and send: */ + buf_chain = (struct sk_buff *)msg_list.next; + ((struct sk_buff *)msg_list.prev)->next = NULL; + tipc_link_xmit2(buf_chain, dnode, dnode); } /** diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index b2eed4ec1526a3..8afe32b7fc9a0b 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -70,7 +70,7 @@ struct distr_item { struct sk_buff *tipc_named_publish(struct publication *publ); struct sk_buff *tipc_named_withdraw(struct publication *publ); void named_cluster_distribute(struct sk_buff *buf); -void tipc_named_node_up(u32 max_item_buf, u32 node); +void tipc_named_node_up(u32 dnode); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); diff --git a/net/tipc/node.c b/net/tipc/node.c index d959343043fd02..f7069299943f84 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) void tipc_node_unlock(struct tipc_node *node) { LIST_HEAD(nsub_list); - struct tipc_link *link; - int pkt_sz = 0; u32 addr = 0; if (likely(!node->action_flags)) { @@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node) node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; } if (node->action_flags & TIPC_NOTIFY_NODE_UP) { - link = node->active_links[0]; node->action_flags &= ~TIPC_NOTIFY_NODE_UP; - if (link) { - pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) * - ITEM_SIZE; - addr = node->addr; - } + addr = node->addr; } spin_unlock_bh(&node->lock); if (!list_empty(&nsub_list)) tipc_nodesub_notify(&nsub_list); - if (pkt_sz) - tipc_named_node_up(pkt_sz, addr); + if (addr) + tipc_named_node_up(addr); } From 25b660c7e202d533e4985380b24729fd12de2b5e Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:40:59 -0400 Subject: [PATCH 2/7] tipc: let internal link users call the new link send function We convert the link internal users (changeover protocol, broadcast synchronization) to use the new packet send function. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 367b0f5886f8d2..d1255ba5121647 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -999,7 +999,7 @@ int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector) * * Called with node locked */ -static void tipc_link_sync_xmit(struct tipc_link *l) +static void tipc_link_sync_xmit(struct tipc_link *link) { struct sk_buff *buf; struct tipc_msg *msg; @@ -1009,10 +1009,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l) return; msg = buf_msg(buf); - tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); - msg_set_last_bcast(msg, l->owner->bclink.acked); - link_add_chain_to_outqueue(l, buf, 0); - tipc_link_push_queue(l); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); + msg_set_last_bcast(msg, link->owner->bclink.acked); + __tipc_link_xmit2(link, buf); } /* @@ -1859,7 +1858,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, } skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); - __tipc_link_xmit(tunnel, buf); + __tipc_link_xmit2(tunnel, buf); } @@ -1892,7 +1891,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) if (buf) { skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit(tunnel, buf); + __tipc_link_xmit2(tunnel, buf); } else { pr_warn("%sunable to send changeover msg\n", link_co_err); @@ -1965,7 +1964,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, length); - __tipc_link_xmit(tunnel, outbuf); + __tipc_link_xmit2(tunnel, outbuf); if (!tipc_link_is_up(l_ptr)) return; iter = iter->next; From 078bec826f7b73cf2a2397680537bcb7e075b492 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:41:00 -0400 Subject: [PATCH 3/7] tipc: add new functions for multicast and broadcast distribution We add a new broadcast link transmit function in bclink.c and a new receive function in socket.c. The purpose is to move the branching between external and internal destination down to the link layer, just as we have done with unicast in earlier commits. We also make use of the new link-independent fragmentation support that was introduced in an earlier commit series. This gives a shorter and simpler code path, and makes it possible to obtain copy-free buffer delivery to all node local destination sockets. The new transmission code is added in parallel with the existing one, and will be used by the socket multicast send function in the next commit in this series. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 55 ++++++++++++++++++++++++++++++++++++++++++++++- net/tipc/bcast.h | 4 +++- net/tipc/msg.c | 35 ++++++++++++++++++++++++++++++ net/tipc/msg.h | 2 ++ net/tipc/socket.c | 40 ++++++++++++++++++++++++++++++++++ net/tipc/socket.h | 2 ++ 6 files changed, 136 insertions(+), 2 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 55c6c9d3e1ceee..ac947251dd3760 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@ /* * net/tipc/bcast.c: TIPC broadcast code * - * Copyright (c) 2004-2006, Ericsson AB + * Copyright (c) 2004-2006, 2014, Ericsson AB * Copyright (c) 2004, Intel Corporation. * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. @@ -38,6 +38,8 @@ #include "core.h" #include "link.h" #include "port.h" +#include "socket.h" +#include "msg.h" #include "bcast.h" #include "name_distr.h" @@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void) tipc_link_reset_all(node); } +uint tipc_bclink_get_mtu(void) +{ + return MAX_PKT_DEFAULT_MCAST; +} + void tipc_bclink_set_flags(unsigned int flags) { bclink->flags |= flags; @@ -408,6 +415,52 @@ int tipc_bclink_xmit(struct sk_buff *buf) return res; } +/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster + * and to identified node local sockets + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + */ +int tipc_bclink_xmit2(struct sk_buff *buf) +{ + int rc = 0; + int bc = 0; + struct sk_buff *clbuf; + + /* Prepare clone of message for local node */ + clbuf = tipc_msg_reassemble(buf); + if (unlikely(!clbuf)) { + kfree_skb_list(buf); + return -EHOSTUNREACH; + } + + /* Broadcast to all other nodes */ + if (likely(bclink)) { + tipc_bclink_lock(); + if (likely(bclink->bcast_nodes.count)) { + rc = __tipc_link_xmit(bcl, buf); + if (likely(!rc)) { + bclink_set_last_sent(); + bcl->stats.queue_sz_counts++; + bcl->stats.accu_queue_sz += bcl->out_queue_size; + } + bc = 1; + } + tipc_bclink_unlock(); + } + + if (unlikely(!bc)) + kfree_skb_list(buf); + + /* Deliver message clone */ + if (likely(!rc)) + tipc_sk_mcast_rcv(clbuf); + else + kfree_skb(clbuf); + + return rc; +} + /** * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet * diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 00330c45df3e04..d90645f408aa9e 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -1,7 +1,7 @@ /* * net/tipc/bcast.h: Include file for TIPC broadcast code * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -98,5 +98,7 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); +uint tipc_bclink_get_mtu(void); +int tipc_bclink_xmit2(struct sk_buff *buf); #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index ce6d929d66d26c..9682296f5e7cea 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -417,3 +417,38 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode) msg_set_destport(msg, dport); return TIPC_OK; } + +/* tipc_msg_reassemble() - clone a buffer chain of fragments and + * reassemble the clones into one message + */ +struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) +{ + struct sk_buff *buf = chain; + struct sk_buff *frag = buf; + struct sk_buff *head = NULL; + int hdr_sz; + + /* Copy header if single buffer */ + if (!buf->next) { + hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); + return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); + } + + /* Clone all fragments and reassemble */ + while (buf) { + frag = skb_clone(buf, GFP_ATOMIC); + if (!frag) + goto error; + frag->next = NULL; + if (tipc_buf_append(&head, &frag)) + break; + if (!head) + goto error; + buf = buf->next; + } + return frag; +error: + pr_warn("Failed do clone local mcast rcv buffer\n"); + kfree_skb(head); + return NULL; +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 7d574346e75e23..a15d59601bf9d3 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -744,4 +744,6 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov, int offset, int dsz, int mtu , struct sk_buff **chain); +struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); + #endif diff --git a/net/tipc/socket.c b/net/tipc/socket.c index de01622672b2bd..8d30995682b190 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -534,6 +534,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, return mask; } +/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets + */ +void tipc_sk_mcast_rcv(struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + struct tipc_port_list dports = {0, NULL, }; + struct tipc_port_list *item; + struct sk_buff *b; + uint i, last, dst = 0; + u32 scope = TIPC_CLUSTER_SCOPE; + + if (in_own_node(msg_orignode(msg))) + scope = TIPC_NODE_SCOPE; + + /* Create destination port list: */ + tipc_nametbl_mc_translate(msg_nametype(msg), + msg_namelower(msg), + msg_nameupper(msg), + scope, + &dports); + last = dports.count; + if (!last) { + kfree_skb(buf); + return; + } + + for (item = &dports; item; item = item->next) { + for (i = 0; i < PLSIZE && ++dst <= last; i++) { + b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; + if (!b) { + pr_warn("Failed do clone mcast rcv buffer\n"); + continue; + } + msg_set_destport(msg, item->ports[i]); + tipc_sk_rcv(b); + } + } + tipc_port_list_free(&dports); +} + /** * tipc_sk_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 2cdede9eda1ba4..43b75b3cecedb9 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk) int tipc_sk_rcv(struct sk_buff *buf); +void tipc_sk_mcast_rcv(struct sk_buff *buf); + #endif From 0abd8ff21f19adddc465538354e9baaca63df073 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:41:01 -0400 Subject: [PATCH 4/7] tipc: start using the new multicast functions In this commit, we convert the socket multicast send function to directly call the new multicast/broadcast function (tipc_bclink_xmit2()) introduced in the previous commit. We do this instead of letting the call go via the now obsolete tipc_port_mcast_xmit(), hence saving a call level and some code complexity. We also remove the initial destination lookup at the message sending side, and replace that with an unconditional lookup at the receiving side, including on the sending node itself. This makes the destination lookup and message transfer more uniform than before. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 7 ++-- net/tipc/socket.c | 90 ++++++++++++++++++++++++++++------------------- 2 files changed, 56 insertions(+), 41 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index ac947251dd3760..4a1c9afc9d748d 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -496,7 +496,7 @@ void tipc_bclink_rcv(struct sk_buff *buf) struct tipc_node *node; u32 next_in; u32 seqno; - int deferred; + int deferred = 0; /* Screen out unwanted broadcast messages */ @@ -547,7 +547,7 @@ void tipc_bclink_rcv(struct sk_buff *buf) tipc_bclink_unlock(); tipc_node_unlock(node); if (likely(msg_mcast(msg))) - tipc_port_mcast_rcv(buf, NULL); + tipc_sk_mcast_rcv(buf); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { @@ -626,8 +626,7 @@ void tipc_bclink_rcv(struct sk_buff *buf) node->bclink.deferred_size += deferred; bclink_update_last_sent(node, seqno); buf = NULL; - } else - deferred = 0; + } tipc_bclink_lock(); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 8d30995682b190..b28eea50c7fcc9 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -53,6 +53,7 @@ static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); +static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -534,6 +535,58 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, return mask; } +/** + * tipc_sendmcast - send multicast message + * @sock: socket structure + * @seq: destination address + * @iov: message data to send + * @dsz: total length of message data + * @timeo: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, + struct iovec *iov, size_t dsz, long timeo) +{ + struct sock *sk = sock->sk; + struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr; + struct sk_buff *buf; + uint mtu; + int rc; + + msg_set_type(mhdr, TIPC_MCAST_MSG); + msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(mhdr, 0); + msg_set_destnode(mhdr, 0); + msg_set_nametype(mhdr, seq->type); + msg_set_namelower(mhdr, seq->lower); + msg_set_nameupper(mhdr, seq->upper); + msg_set_hdr_sz(mhdr, MCAST_H_SIZE); + +new_mtu: + mtu = tipc_bclink_get_mtu(); + rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf); + if (unlikely(rc < 0)) + return rc; + + do { + rc = tipc_bclink_xmit(buf); + if (likely(rc >= 0)) { + rc = dsz; + break; + } + if (rc == -EMSGSIZE) + goto new_mtu; + if (rc != -ELINKCONG) + break; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); + return rc; +} + /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets */ void tipc_sk_mcast_rcv(struct sk_buff *buf) @@ -669,43 +722,6 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) return 0; } -/** - * tipc_sendmcast - send multicast message - * @sock: socket structure - * @seq: destination address - * @iov: message data to send - * @dsz: total length of message data - * @timeo: timeout to wait for wakeup - * - * Called from function tipc_sendmsg(), which has done all sanity checks - * Returns the number of bytes sent on success, or errno - */ -static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, - struct iovec *iov, size_t dsz, long timeo) -{ - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - int rc; - - do { - if (sock->state != SS_READY) { - rc = -EOPNOTSUPP; - break; - } - rc = tipc_port_mcast_xmit(&tsk->port, seq, iov, dsz); - if (likely(rc >= 0)) { - if (sock->state != SS_READY) - sock->state = SS_CONNECTING; - break; - } - if (rc != -ELINKCONG) - break; - rc = tipc_wait_for_sndmsg(sock, &timeo); - } while (!rc); - - return rc; -} - /** * tipc_sendmsg - send message in connectionless manner * @iocb: if NULL, indicates that socket lock is already held From c4116e10579c5bbbfc3cd2ad0324ee0d8691e531 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:41:02 -0400 Subject: [PATCH 5/7] tipc: remove unreferenced functions We can now remove a number of functions which have become obsolete and unreferenced through this commit series. There are no functional changes in this commit. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 26 ----- net/tipc/bcast.h | 1 - net/tipc/link.c | 247 ----------------------------------------------- net/tipc/link.h | 6 -- net/tipc/msg.c | 35 ------- net/tipc/msg.h | 3 - net/tipc/port.c | 112 --------------------- net/tipc/port.h | 10 -- 8 files changed, 440 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 4a1c9afc9d748d..2f3256da6f8819 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -389,32 +389,6 @@ static void bclink_peek_nack(struct tipc_msg *msg) tipc_node_unlock(n_ptr); } -/* - * tipc_bclink_xmit - broadcast a packet to all nodes in cluster - */ -int tipc_bclink_xmit(struct sk_buff *buf) -{ - int res; - - tipc_bclink_lock(); - - if (!bclink->bcast_nodes.count) { - res = msg_data_sz(buf_msg(buf)); - kfree_skb(buf); - goto exit; - } - - res = __tipc_link_xmit(bcl, buf); - if (likely(res >= 0)) { - bclink_set_last_sent(); - bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += bcl->out_queue_size; - } -exit: - tipc_bclink_unlock(); - return res; -} - /* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster * and to identified node local sockets * @buf: chain of buffers containing message diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index d90645f408aa9e..af6b6579f5c76a 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -89,7 +89,6 @@ void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); struct tipc_node *tipc_bclink_retransmit_to(void); void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); -int tipc_bclink_xmit(struct sk_buff *buf); void tipc_bclink_rcv(struct sk_buff *buf); u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); diff --git a/net/tipc/link.c b/net/tipc/link.c index d1255ba5121647..28730ddf4b787c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -85,7 +85,6 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf); @@ -679,180 +678,6 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) } } -/* - * link_bundle_buf(): Append contents of a buffer to - * the tail of an existing one. - */ -static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, - struct sk_buff *buf) -{ - struct tipc_msg *bundler_msg = buf_msg(bundler); - struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 bundle_size = msg_size(bundler_msg); - u32 to_pos = align(bundle_size); - u32 pad = to_pos - bundle_size; - - if (msg_user(bundler_msg) != MSG_BUNDLER) - return 0; - if (msg_type(bundler_msg) != OPEN_MSG) - return 0; - if (skb_tailroom(bundler) < (pad + size)) - return 0; - if (l_ptr->max_pkt < (to_pos + size)) - return 0; - - skb_put(bundler, pad + size); - skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); - msg_set_size(bundler_msg, to_pos + size); - msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); - kfree_skb(buf); - l_ptr->stats.sent_bundled++; - return 1; -} - -static void link_add_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf, - struct tipc_msg *msg) -{ - u32 ack = mod(l_ptr->next_in_no - 1); - u32 seqno = mod(l_ptr->next_out_no++); - - msg_set_word(msg, 2, ((ack << 16) | seqno)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - buf->next = NULL; - if (l_ptr->first_out) { - l_ptr->last_out->next = buf; - l_ptr->last_out = buf; - } else - l_ptr->first_out = l_ptr->last_out = buf; - - l_ptr->out_queue_size++; - if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) - l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; -} - -static void link_add_chain_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf_chain, - u32 long_msgno) -{ - struct sk_buff *buf; - struct tipc_msg *msg; - - if (!l_ptr->next_out) - l_ptr->next_out = buf_chain; - while (buf_chain) { - buf = buf_chain; - buf_chain = buf_chain->next; - - msg = buf_msg(buf); - msg_set_long_msgno(msg, long_msgno); - link_add_to_outqueue(l_ptr, buf, msg); - } -} - -/* - * tipc_link_xmit() is the 'full path' for messages, called from - * inside TIPC when the 'fast path' in tipc_send_xmit - * has failed, and from link_send() - */ -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 dsz = msg_data_sz(msg); - u32 queue_size = l_ptr->out_queue_size; - u32 imp = tipc_msg_tot_importance(msg); - u32 queue_limit = l_ptr->queue_limit[imp]; - u32 max_packet = l_ptr->max_pkt; - - /* Match msg importance against queue limits: */ - if (unlikely(queue_size >= queue_limit)) { - if (imp <= TIPC_CRITICAL_IMPORTANCE) { - link_schedule_port(l_ptr, msg_origport(msg), size); - kfree_skb(buf); - return -ELINKCONG; - } - kfree_skb(buf); - if (imp > CONN_MANAGER) { - pr_warn("%s<%s>, send queue full", link_rst_msg, - l_ptr->name); - tipc_link_reset(l_ptr); - } - return dsz; - } - - /* Fragmentation needed ? */ - if (size > max_packet) - return tipc_link_frag_xmit(l_ptr, buf); - - /* Packet can be queued or sent. */ - if (likely(!link_congested(l_ptr))) { - link_add_to_outqueue(l_ptr, buf, msg); - - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return dsz; - } - /* Congestion: can message be bundled ? */ - if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && - (msg_user(msg) != MSG_FRAGMENTER)) { - - /* Try adding message to an existing bundle */ - if (l_ptr->next_out && - link_bundle_buf(l_ptr, l_ptr->last_out, buf)) - return dsz; - - /* Try creating a new bundle */ - if (size <= max_packet * 2 / 3) { - struct sk_buff *bundler = tipc_buf_acquire(max_packet); - struct tipc_msg bundler_hdr; - - if (bundler) { - tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, - INT_H_SIZE, l_ptr->addr); - skb_copy_to_linear_data(bundler, &bundler_hdr, - INT_H_SIZE); - skb_trim(bundler, INT_H_SIZE); - link_bundle_buf(l_ptr, bundler, buf); - buf = bundler; - msg = buf_msg(buf); - l_ptr->stats.sent_bundles++; - } - } - } - if (!l_ptr->next_out) - l_ptr->next_out = buf; - link_add_to_outqueue(l_ptr, buf, msg); - return dsz; -} - -/* - * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use - * has not been selected yet, and the the owner node is not locked - * Called by TIPC internal users, e.g. the name distributor - */ -int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) -{ - struct tipc_link *l_ptr; - struct tipc_node *n_ptr; - int res = -ELINKCONG; - - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[selector & 1]; - if (l_ptr) - res = __tipc_link_xmit(l_ptr, buf); - else - kfree_skb(buf); - tipc_node_unlock(n_ptr); - } else { - kfree_skb(buf); - } - return res; -} - /* tipc_link_cong: determine return value and how to treat the * sent buffer during link congestion. * - For plain, errorless user data messages we keep the buffer and @@ -2123,78 +1948,6 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) kfree_skb(buf); } -/* - * Fragmentation/defragmentation: - */ - -/* - * tipc_link_frag_xmit: Entry for buffers needing fragmentation. - * The buffer is complete, inclusive total message length. - * Returns user data length. - */ -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) -{ - struct sk_buff *buf_chain = NULL; - struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; - struct tipc_msg *inmsg = buf_msg(buf); - struct tipc_msg fragm_hdr; - u32 insize = msg_size(inmsg); - u32 dsz = msg_data_sz(inmsg); - unchar *crs = buf->data; - u32 rest = insize; - u32 pack_sz = l_ptr->max_pkt; - u32 fragm_sz = pack_sz - INT_H_SIZE; - u32 fragm_no = 0; - u32 destaddr; - - if (msg_short(inmsg)) - destaddr = l_ptr->addr; - else - destaddr = msg_destnode(inmsg); - - /* Prepare reusable fragment header: */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, destaddr); - - /* Chop up message: */ - while (rest > 0) { - struct sk_buff *fragm; - - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } - fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (fragm == NULL) { - kfree_skb(buf); - kfree_skb_list(buf_chain); - return -ENOMEM; - } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - fragm_no++; - msg_set_fragm_no(&fragm_hdr, fragm_no); - skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, - fragm_sz); - buf_chain_tail->next = fragm; - buf_chain_tail = fragm; - - rest -= fragm_sz; - crs += fragm_sz; - msg_set_type(&fragm_hdr, FRAGMENT); - } - kfree_skb(buf); - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - - return dsz; -} - static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) { if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) diff --git a/net/tipc/link.h b/net/tipc/link.h index 04a59c58d38514..f0ebeb677a8b14 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -226,15 +226,9 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); -int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector); -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf); -int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); u32 tipc_link_get_max_pkt(u32 dest, u32 selector); -int tipc_link_iovec_xmit_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 9682296f5e7cea..e3102ea494d448 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -60,41 +60,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, msg_set_destnode(m, destnode); } -/** - * tipc_msg_build - create message using specified header and data - * - * Note: Caller must not hold any locks in case copy_from_user() is interrupted! - * - * Returns message data size or errno - */ -int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - unsigned int len, int max_size, struct sk_buff **buf) -{ - int dsz, sz, hsz; - unsigned char *to; - - dsz = len; - hsz = msg_hdr_sz(hdr); - sz = hsz + dsz; - msg_set_size(hdr, sz); - if (unlikely(sz > max_size)) { - *buf = NULL; - return dsz; - } - - *buf = tipc_buf_acquire(sz); - if (!(*buf)) - return -ENOMEM; - skb_copy_to_linear_data(*buf, hdr, hsz); - to = (*buf)->data + hsz; - if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) { - kfree_skb(*buf); - *buf = NULL; - return -EFAULT; - } - return dsz; -} - /* tipc_buf_append(): Append a buffer to the fragment list of another buffer * @*headbuf: in: NULL for first frag, otherwise value returned from prev call * out: set when successful non-complete reassembly, otherwise NULL diff --git a/net/tipc/msg.h b/net/tipc/msg.h index a15d59601bf9d3..868fe22e34528b 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -732,9 +732,6 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode); void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); -int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - unsigned int len, int max_size, struct sk_buff **buf); - int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); diff --git a/net/tipc/port.c b/net/tipc/port.c index 0d09dcb6da182f..835dca1fd7ad98 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -74,118 +74,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg) (!peernode && (orignode == tipc_own_addr)); } -/** - * tipc_port_mcast_xmit - send a multicast message to local and remote - * destinations - */ -int tipc_port_mcast_xmit(struct tipc_port *oport, - struct tipc_name_seq const *seq, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *hdr; - struct sk_buff *buf; - struct sk_buff *ibuf = NULL; - struct tipc_port_list dports = {0, NULL, }; - int ext_targets; - int res; - - /* Create multicast message */ - hdr = &oport->phdr; - msg_set_type(hdr, TIPC_MCAST_MSG); - msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); - msg_set_destport(hdr, 0); - msg_set_destnode(hdr, 0); - msg_set_nametype(hdr, seq->type); - msg_set_namelower(hdr, seq->lower); - msg_set_nameupper(hdr, seq->upper); - msg_set_hdr_sz(hdr, MCAST_H_SIZE); - res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (unlikely(!buf)) - return res; - - /* Figure out where to send multicast message */ - ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, - TIPC_NODE_SCOPE, &dports); - - /* Send message to destinations (duplicate it only if necessary) */ - if (ext_targets) { - if (dports.count != 0) { - ibuf = skb_copy(buf, GFP_ATOMIC); - if (ibuf == NULL) { - tipc_port_list_free(&dports); - kfree_skb(buf); - return -ENOMEM; - } - } - res = tipc_bclink_xmit(buf); - if ((res < 0) && (dports.count != 0)) - kfree_skb(ibuf); - } else { - ibuf = buf; - } - - if (res >= 0) { - if (ibuf) - tipc_port_mcast_rcv(ibuf, &dports); - } else { - tipc_port_list_free(&dports); - } - return res; -} - -/** - * tipc_port_mcast_rcv - deliver multicast message to all destination ports - * - * If there is no port list, perform a lookup to create one - */ -void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) -{ - struct tipc_msg *msg; - struct tipc_port_list dports = {0, NULL, }; - struct tipc_port_list *item = dp; - int cnt = 0; - - msg = buf_msg(buf); - - /* Create destination port list, if one wasn't supplied */ - if (dp == NULL) { - tipc_nametbl_mc_translate(msg_nametype(msg), - msg_namelower(msg), - msg_nameupper(msg), - TIPC_CLUSTER_SCOPE, - &dports); - item = dp = &dports; - } - - /* Deliver a copy of message to each destination port */ - if (dp->count != 0) { - msg_set_destnode(msg, tipc_own_addr); - if (dp->count == 1) { - msg_set_destport(msg, dp->ports[0]); - tipc_sk_rcv(buf); - tipc_port_list_free(dp); - return; - } - for (; cnt < dp->count; cnt++) { - int index = cnt % PLSIZE; - struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); - - if (b == NULL) { - pr_warn("Unable to deliver multicast message(s)\n"); - goto exit; - } - if ((index == 0) && (cnt != 0)) - item = item->next; - msg_set_destport(buf_msg(b), item->ports[index]); - tipc_sk_rcv(b); - } - } -exit: - kfree_skb(buf); - tipc_port_list_free(dp); -} - /* tipc_port_init - intiate TIPC port and lock it * * Returns obtained reference if initialization is successful, zero otherwise diff --git a/net/tipc/port.h b/net/tipc/port.h index 0e47052daa29ea..3f93454592b6df 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -120,17 +120,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, struct tipc_portid const *peer); int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); -/* - * TIPC messaging routines - */ - -int tipc_port_mcast_xmit(struct tipc_port *port, - struct tipc_name_seq const *seq, - struct iovec const *msg, - unsigned int len); - struct sk_buff *tipc_port_get_ports(void); -void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp); void tipc_port_reinit(void); /** From 9fbfb8b120bd4fe89cd70d6c8841e6e1cfab2609 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:41:03 -0400 Subject: [PATCH 6/7] tipc: rename temporarily named functions After the previous commit, we can now give the functions with temporary names, such as tipc_link_xmit2(), tipc_msg_build2() etc., their proper names. There are no functional changes in this commit. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 6 +++--- net/tipc/bcast.h | 2 +- net/tipc/link.c | 18 +++++++++--------- net/tipc/link.h | 4 ++-- net/tipc/msg.c | 6 +++--- net/tipc/msg.h | 4 ++-- net/tipc/name_distr.c | 4 ++-- net/tipc/port.c | 10 +++++----- net/tipc/socket.c | 20 ++++++++++---------- 9 files changed, 37 insertions(+), 37 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 2f3256da6f8819..d890d480ae3bc3 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -389,13 +389,13 @@ static void bclink_peek_nack(struct tipc_msg *msg) tipc_node_unlock(n_ptr); } -/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster - * and to identified node local sockets +/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster + * and to identified node local sockets * @buf: chain of buffers containing message * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_bclink_xmit2(struct sk_buff *buf) +int tipc_bclink_xmit(struct sk_buff *buf) { int rc = 0; int bc = 0; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index af6b6579f5c76a..4875d9536aee7a 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -98,6 +98,6 @@ int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); uint tipc_bclink_get_mtu(void); -int tipc_bclink_xmit2(struct sk_buff *buf); +int tipc_bclink_xmit(struct sk_buff *buf); #endif diff --git a/net/tipc/link.c b/net/tipc/link.c index 28730ddf4b787c..fb1485dc6736ec 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -706,7 +706,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) } /** - * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked + * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked * @link: link to use * @buf: chain of buffers containing message * Consumes the buffer chain, except when returning -ELINKCONG @@ -715,7 +715,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) * Only the socket functions tipc_send_stream() and tipc_send_packet() need * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); uint psz = msg_size(msg); @@ -783,7 +783,7 @@ int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf) } /** - * tipc_link_xmit2() is the general link level function for message sending + * tipc_link_xmit() is the general link level function for message sending * @buf: chain of buffers containing message * @dsz: amount of user data to be sent * @dnode: address of destination node @@ -791,7 +791,7 @@ int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf) * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector) +int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) { struct tipc_link *link = NULL; struct tipc_node *node; @@ -802,7 +802,7 @@ int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector) tipc_node_lock(node); link = node->active_links[selector & 1]; if (link) - rc = __tipc_link_xmit2(link, buf); + rc = __tipc_link_xmit(link, buf); tipc_node_unlock(node); } @@ -836,7 +836,7 @@ static void tipc_link_sync_xmit(struct tipc_link *link) msg = buf_msg(buf); tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); msg_set_last_bcast(msg, link->owner->bclink.acked); - __tipc_link_xmit2(link, buf); + __tipc_link_xmit(link, buf); } /* @@ -1683,7 +1683,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, } skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); - __tipc_link_xmit2(tunnel, buf); + __tipc_link_xmit(tunnel, buf); } @@ -1716,7 +1716,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) if (buf) { skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit2(tunnel, buf); + __tipc_link_xmit(tunnel, buf); } else { pr_warn("%sunable to send changeover msg\n", link_co_err); @@ -1789,7 +1789,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, length); - __tipc_link_xmit2(tunnel, outbuf); + __tipc_link_xmit(tunnel, outbuf); if (!tipc_link_is_up(l_ptr)) return; iter = iter->next; diff --git a/net/tipc/link.h b/net/tipc/link.h index f0ebeb677a8b14..782983ccd323a8 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -226,8 +226,8 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); -int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector); -int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf); +int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); u32 tipc_link_get_max_pkt(u32 dest, u32 selector); void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, diff --git a/net/tipc/msg.c b/net/tipc/msg.c index e3102ea494d448..b6f45d029933cb 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -120,7 +120,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) /** - * tipc_msg_build2 - create buffer chain containing specified header and data + * tipc_msg_build - create buffer chain containing specified header and data * @mhdr: Message header, to be prepended to data * @iov: User data * @offset: Posision in iov to start copying from @@ -129,8 +129,8 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) * @chain: Buffer or chain of buffers to be returned to caller * Returns message data size or errno: -ENOMEM, -EFAULT */ -int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov, - int offset, int dsz, int pktmax , struct sk_buff **chain) +int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, + int offset, int dsz, int pktmax , struct sk_buff **chain) { int mhsz = msg_hdr_sz(mhdr); int msz = mhsz + dsz; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 868fe22e34528b..462fa194a6afe5 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -738,8 +738,8 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); -int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov, - int offset, int dsz, int mtu , struct sk_buff **chain); +int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, + int offset, int dsz, int mtu , struct sk_buff **chain); struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index d16f9475fa765e..dcc15bcd569279 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -116,7 +116,7 @@ void named_cluster_distribute(struct sk_buff *buf) if (!obuf) break; msg_set_destnode(buf_msg(obuf), dnode); - tipc_link_xmit2(obuf, dnode, dnode); + tipc_link_xmit(obuf, dnode, dnode); } rcu_read_unlock(); @@ -232,7 +232,7 @@ void tipc_named_node_up(u32 dnode) /* Convert circular list to linear list and send: */ buf_chain = (struct sk_buff *)msg_list.next; ((struct sk_buff *)msg_list.prev)->next = NULL; - tipc_link_xmit2(buf_chain, dnode, dnode); + tipc_link_xmit(buf_chain, dnode, dnode); } /** diff --git a/net/tipc/port.c b/net/tipc/port.c index 835dca1fd7ad98..7e096a5e770156 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -130,7 +130,7 @@ void tipc_port_destroy(struct tipc_port *p_ptr) tipc_nodesub_unsubscribe(&p_ptr->subscription); msg = buf_msg(buf); peer = msg_destnode(msg); - tipc_link_xmit2(buf, peer, msg_link_selector(msg)); + tipc_link_xmit(buf, peer, msg_link_selector(msg)); } spin_lock_bh(&tipc_port_list_lock); list_del(&p_ptr->port_list); @@ -187,7 +187,7 @@ static void port_timeout(unsigned long ref) } tipc_port_unlock(p_ptr); msg = buf_msg(buf); - tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg)); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); } @@ -202,7 +202,7 @@ static void port_handle_node_down(unsigned long ref) buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); tipc_port_unlock(p_ptr); msg = buf_msg(buf); - tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg)); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); } @@ -347,7 +347,7 @@ void tipc_acknowledge(u32 ref, u32 ack) if (!buf) return; msg = buf_msg(buf); - tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg)); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); } int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, @@ -509,6 +509,6 @@ int tipc_port_shutdown(u32 ref) buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); tipc_port_unlock(p_ptr); msg = buf_msg(buf); - tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg)); + tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); return tipc_port_disconnect(ref); } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b28eea50c7fcc9..5ad42f6137d852 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -131,7 +131,7 @@ static void reject_rx_queue(struct sock *sk) while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit2(buf, dnode, 0); + tipc_link_xmit(buf, dnode, 0); } } @@ -341,7 +341,7 @@ static int tipc_release(struct socket *sock) tipc_port_disconnect(port->ref); } if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit2(buf, dnode, 0); + tipc_link_xmit(buf, dnode, 0); } } @@ -566,7 +566,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, new_mtu: mtu = tipc_bclink_get_mtu(); - rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); if (unlikely(rc < 0)) return rc; @@ -821,12 +821,12 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, new_mtu: mtu = tipc_node_get_mtu(dnode, tsk->port.ref); - rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); if (rc < 0) goto exit; do { - rc = tipc_link_xmit2(buf, dnode, tsk->port.ref); + rc = tipc_link_xmit(buf, dnode, tsk->port.ref); if (likely(rc >= 0)) { if (sock->state != SS_READY) sock->state = SS_CONNECTING; @@ -934,12 +934,12 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, next: mtu = port->max_pkt; send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); - rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf); + rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf); if (unlikely(rc < 0)) goto exit; do { if (likely(!tipc_sk_conn_cong(tsk))) { - rc = tipc_link_xmit2(buf, dnode, ref); + rc = tipc_link_xmit(buf, dnode, ref); if (likely(!rc)) { tsk->sent_unacked++; sent += send; @@ -1571,7 +1571,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) return 0; - tipc_link_xmit2(buf, onode, 0); + tipc_link_xmit(buf, onode, 0); return 0; } @@ -1623,7 +1623,7 @@ int tipc_sk_rcv(struct sk_buff *buf) if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) return -EHOSTUNREACH; - tipc_link_xmit2(buf, dnode, 0); + tipc_link_xmit(buf, dnode, 0); return (rc < 0) ? -EHOSTUNREACH : 0; } @@ -1910,7 +1910,7 @@ static int tipc_shutdown(struct socket *sock, int how) } tipc_port_disconnect(port->ref); if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN)) - tipc_link_xmit2(buf, peer, 0); + tipc_link_xmit(buf, peer, 0); } else { tipc_port_shutdown(port->ref); } From 6f92ee54b316c116e125a6bb268abe308e4c14e6 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 16 Jul 2014 20:41:04 -0400 Subject: [PATCH 7/7] tipc: ensure sequential message delivery across dual bearers When we run broadcast packets over dual bearers/interfaces, the current transmission code is flipping bearers between each sent packet, with the purpose of leveraging the double bandwidth available. The receiving bclink is resequencing the packets if needed, so all messages are delivered upwards from the broadcast link in the correct order, even if they may arrive in concurrent interrupts. However, at the moment of delivery upwards to the socket, we release all spinlocks (bclink_lock, node_lock), so it is still possible that arriving messages bypass each other before they reach the socket queue. We fix this by applying the same technique we are using for unicast traffic. We use a link selector (i.e., the last bit of sending port number) to ensure that messages from the same sender socket always are sent over the same bearer. This guarantees sequential delivery between socket pairs, which is sufficient to satisfy the protocol spec, as well as all known user requirements. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index d890d480ae3bc3..dd13bfa0933324 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -637,6 +637,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, struct tipc_media_addr *unused2) { int bp_index; + struct tipc_msg *msg = buf_msg(buf); /* Prepare broadcast link message for reliable transmission, * if first time trying to send it; @@ -644,10 +645,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, * since they are sent in an unreliable manner and don't need it */ if (likely(!msg_non_seq(buf_msg(buf)))) { - struct tipc_msg *msg; - bcbuf_set_acks(buf, bclink->bcast_nodes.count); - msg = buf_msg(buf); msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tipc_net_id); bcl->stats.sent_info++; @@ -664,12 +662,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; - struct tipc_bearer *b = p; + struct tipc_bearer *bp[2] = {p, s}; + struct tipc_bearer *b = bp[msg_link_selector(msg)]; struct sk_buff *tbuf; if (!p) break; /* No more bearers to try */ - + if (!b) + b = p; tipc_nmap_diff(&bcbearer->remains, &b->nodes, &bcbearer->remains_new); if (bcbearer->remains_new.count == bcbearer->remains.count) @@ -686,13 +686,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); kfree_skb(tbuf); /* Bearer keeps a clone */ } - - /* Swap bearers for next packet */ - if (s) { - bcbearer->bpairs[bp_index].primary = s; - bcbearer->bpairs[bp_index].secondary = p; - } - if (bcbearer->remains_new.count == 0) break; /* All targets reached */