Skip to content

Commit

Permalink
rxrpc: Create a per-local endpoint receive queue and I/O thread
Browse files Browse the repository at this point in the history
Create a per-local receive queue to which, in a future patch, all incoming
packets will be directed and an I/O thread that will process those packets
and perform all transmission of packets.

Destruction of the local endpoint is also moved from the local processor
work item (which will be absorbed) to the thread.

Signed-off-by: David Howells <[email protected]>
cc: Marc Dionne <[email protected]>
cc: [email protected]
  • Loading branch information
dhowells committed Dec 1, 2022
1 parent 96b2d69 commit a275da6
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
10 changes: 10 additions & 0 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ struct rxrpc_net {
atomic_t stat_rx_acks[256];

atomic_t stat_why_req_ack[8];

atomic_t stat_io_loop;
};

/*
Expand Down Expand Up @@ -280,12 +282,14 @@ struct rxrpc_local {
struct hlist_node link;
struct socket *socket; /* my UDP socket */
struct work_struct processor;
struct task_struct *io_thread;
struct list_head ack_tx_queue; /* List of ACKs that need sending */
spinlock_t ack_tx_lock; /* ACK list lock */
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
struct sk_buff_head reject_queue; /* packets awaiting rejection */
struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
struct sk_buff_head rx_queue; /* Received packets */
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
spinlock_t lock; /* access lock */
Expand Down Expand Up @@ -954,6 +958,11 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection
* io_thread.c
*/
int rxrpc_input_packet(struct sock *, struct sk_buff *);
int rxrpc_io_thread(void *data);
static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
{
wake_up_process(local->io_thread);
}

/*
* insecure.c
Expand Down Expand Up @@ -984,6 +993,7 @@ void rxrpc_put_local(struct rxrpc_local *, enum rxrpc_local_trace);
struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *, enum rxrpc_local_trace);
void rxrpc_unuse_local(struct rxrpc_local *, enum rxrpc_local_trace);
void rxrpc_queue_local(struct rxrpc_local *);
void rxrpc_destroy_local(struct rxrpc_local *local);
void rxrpc_destroy_all_locals(struct rxrpc_net *);

static inline bool __rxrpc_unuse_local(struct rxrpc_local *local,
Expand Down
51 changes: 50 additions & 1 deletion net/rxrpc/io_thread.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/* RxRPC packet reception
*
* Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
* Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
* Written by David Howells ([email protected])
*/

Expand Down Expand Up @@ -368,3 +368,52 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
_leave(" [badmsg]");
return 0;
}

/*
* I/O and event handling thread.
*/
int rxrpc_io_thread(void *data)
{
struct sk_buff_head rx_queue;
struct rxrpc_local *local = data;
struct sk_buff *skb;

skb_queue_head_init(&rx_queue);

set_user_nice(current, MIN_NICE);

for (;;) {
rxrpc_inc_stat(local->rxnet, stat_io_loop);

/* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) {
// TODO: Input packet
rxrpc_free_skb(skb, rxrpc_skb_put_input);
continue;
}

if (!skb_queue_empty(&local->rx_queue)) {
spin_lock_irq(&local->rx_queue.lock);
skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
spin_unlock_irq(&local->rx_queue.lock);
continue;
}

set_current_state(TASK_INTERRUPTIBLE);
if (!skb_queue_empty(&local->rx_queue)) {
__set_current_state(TASK_RUNNING);
continue;
}

if (kthread_should_stop())
break;
schedule();
}

__set_current_state(TASK_RUNNING);
rxrpc_see_local(local, rxrpc_local_stop);
rxrpc_destroy_local(local);
local->io_thread = NULL;
rxrpc_see_local(local, rxrpc_local_stopped);
return 0;
}
39 changes: 22 additions & 17 deletions net/rxrpc/local_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
init_rwsem(&local->defrag_sem);
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
skb_queue_head_init(&local->rx_queue);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
spin_lock_init(&local->lock);
Expand All @@ -126,6 +127,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
struct udp_tunnel_sock_cfg tuncfg = {NULL};
struct sockaddr_rxrpc *srx = &local->srx;
struct udp_port_cfg udp_conf = {0};
struct task_struct *io_thread;
struct sock *usk;
int ret;

Expand Down Expand Up @@ -185,8 +187,23 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
BUG();
}

io_thread = kthread_run(rxrpc_io_thread, local,
"krxrpcio/%u", ntohs(udp_conf.local_udp_port));
if (IS_ERR(io_thread)) {
ret = PTR_ERR(io_thread);
goto error_sock;
}

local->io_thread = io_thread;
_leave(" = 0");
return 0;

error_sock:
kernel_sock_shutdown(local->socket, SHUT_RDWR);
local->socket->sk->sk_user_data = NULL;
sock_release(local->socket);
local->socket = NULL;
return ret;
}

/*
Expand Down Expand Up @@ -360,19 +377,8 @@ struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local,
*/
void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
{
unsigned int debug_id;
int r, u;

if (local) {
debug_id = local->debug_id;
r = refcount_read(&local->ref);
u = atomic_dec_return(&local->active_users);
trace_rxrpc_local(debug_id, why, r, u);
if (u == 0) {
rxrpc_get_local(local, rxrpc_local_get_queue);
rxrpc_queue_local(local);
}
}
if (local && __rxrpc_unuse_local(local, why))
kthread_stop(local->io_thread);
}

/*
Expand All @@ -382,7 +388,7 @@ void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
* Closing the socket cannot be done from bottom half context or RCU callback
* context because it might sleep.
*/
static void rxrpc_local_destroyer(struct rxrpc_local *local)
void rxrpc_destroy_local(struct rxrpc_local *local)
{
struct socket *socket = local->socket;
struct rxrpc_net *rxnet = local->rxnet;
Expand Down Expand Up @@ -411,6 +417,7 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
*/
rxrpc_purge_queue(&local->reject_queue);
rxrpc_purge_queue(&local->event_queue);
rxrpc_purge_queue(&local->rx_queue);
}

/*
Expand All @@ -430,10 +437,8 @@ static void rxrpc_local_processor(struct work_struct *work)

do {
again = false;
if (!__rxrpc_use_local(local, rxrpc_local_use_work)) {
rxrpc_local_destroyer(local);
if (!__rxrpc_use_local(local, rxrpc_local_use_work))
break;
}

if (!list_empty(&local->ack_tx_queue)) {
rxrpc_transmit_ack_packets(local);
Expand Down
12 changes: 9 additions & 3 deletions net/rxrpc/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
if (v == SEQ_START_TOKEN) {
seq_puts(seq,
"Proto Local "
" Use Act\n");
" Use Act RxQ\n");
return 0;
}

Expand All @@ -351,10 +351,11 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
sprintf(lbuff, "%pISpc", &local->srx.transport);

seq_printf(seq,
"UDP %-47.47s %3u %3u\n",
"UDP %-47.47s %3u %3u %3u\n",
lbuff,
refcount_read(&local->ref),
atomic_read(&local->active_users));
atomic_read(&local->active_users),
local->rx_queue.qlen);

return 0;
}
Expand Down Expand Up @@ -463,6 +464,9 @@ int rxrpc_stats_show(struct seq_file *seq, void *v)
"Buffers : txb=%u rxb=%u\n",
atomic_read(&rxrpc_nr_txbuf),
atomic_read(&rxrpc_n_rx_skbs));
seq_printf(seq,
"IO-thread: loops=%u\n",
atomic_read(&rxnet->stat_io_loop));
return 0;
}

Expand Down Expand Up @@ -492,5 +496,7 @@ int rxrpc_stats_clear(struct file *file, char *buf, size_t size)
memset(&rxnet->stat_rx_acks, 0, sizeof(rxnet->stat_rx_acks));

memset(&rxnet->stat_why_req_ack, 0, sizeof(rxnet->stat_why_req_ack));

atomic_set(&rxnet->stat_io_loop, 0);
return size;
}

0 comments on commit a275da6

Please sign in to comment.