Skip to content

Commit 730c5fd

Browse files
committed
rxrpc: Fix local endpoint refcounting
The object lifetime management on the rxrpc_local struct is broken in that the rxrpc_local_processor() function is expected to clean up and remove an object - but it may get requeued by packets coming in on the backing UDP socket once it starts running. This may result in the assertion in rxrpc_local_rcu() firing because the memory has been scheduled for RCU destruction whilst still queued: rxrpc: Assertion failed ------------[ cut here ]------------ kernel BUG at net/rxrpc/local_object.c:468! Note that if the processor comes around before the RCU free function, it will just do nothing because ->dead is true. Fix this by adding a separate refcount to count active users of the endpoint that causes the endpoint to be destroyed when it reaches 0. The original refcount can then be used to refcount objects through the work processor and cause the memory to be rcu freed when that reaches 0. Fixes: 4f95dd7 ("rxrpc: Rework local endpoint management") Reported-by: [email protected] Signed-off-by: David Howells <[email protected]>
1 parent 8c25d08 commit 730c5fd

File tree

4 files changed

+72
-39
lines changed

4 files changed

+72
-39
lines changed

net/rxrpc/af_rxrpc.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
193193

194194
service_in_use:
195195
write_unlock(&local->services_lock);
196-
rxrpc_put_local(local);
196+
rxrpc_unuse_local(local);
197197
ret = -EADDRINUSE;
198198
error_unlock:
199199
release_sock(&rx->sk);
@@ -901,7 +901,7 @@ static int rxrpc_release_sock(struct sock *sk)
901901
rxrpc_queue_work(&rxnet->service_conn_reaper);
902902
rxrpc_queue_work(&rxnet->client_conn_reaper);
903903

904-
rxrpc_put_local(rx->local);
904+
rxrpc_unuse_local(rx->local);
905905
rx->local = NULL;
906906
key_put(rx->key);
907907
rx->key = NULL;

net/rxrpc/ar-internal.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ struct rxrpc_security {
254254
*/
255255
struct rxrpc_local {
256256
struct rcu_head rcu;
257-
atomic_t usage;
257+
atomic_t active_users; /* Number of users of the local endpoint */
258+
atomic_t usage; /* Number of references to the structure */
258259
struct rxrpc_net *rxnet; /* The network ns in which this resides */
259260
struct list_head link;
260261
struct socket *socket; /* my UDP socket */
@@ -1002,6 +1003,8 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *, const struct sockaddr_rxrpc
10021003
struct rxrpc_local *rxrpc_get_local(struct rxrpc_local *);
10031004
struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *);
10041005
void rxrpc_put_local(struct rxrpc_local *);
1006+
struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *);
1007+
void rxrpc_unuse_local(struct rxrpc_local *);
10051008
void rxrpc_queue_local(struct rxrpc_local *);
10061009
void rxrpc_destroy_all_locals(struct rxrpc_net *);
10071010

net/rxrpc/input.c

+12-4
Original file line numberDiff line numberDiff line change
@@ -1108,8 +1108,12 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
11081108
{
11091109
_enter("%p,%p", local, skb);
11101110

1111-
skb_queue_tail(&local->event_queue, skb);
1112-
rxrpc_queue_local(local);
1111+
if (rxrpc_get_local_maybe(local)) {
1112+
skb_queue_tail(&local->event_queue, skb);
1113+
rxrpc_queue_local(local);
1114+
} else {
1115+
rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
1116+
}
11131117
}
11141118

11151119
/*
@@ -1119,8 +1123,12 @@ static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
11191123
{
11201124
CHECK_SLAB_OKAY(&local->usage);
11211125

1122-
skb_queue_tail(&local->reject_queue, skb);
1123-
rxrpc_queue_local(local);
1126+
if (rxrpc_get_local_maybe(local)) {
1127+
skb_queue_tail(&local->reject_queue, skb);
1128+
rxrpc_queue_local(local);
1129+
} else {
1130+
rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
1131+
}
11241132
}
11251133

11261134
/*

net/rxrpc/local_object.c

+54-32
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
7979
local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
8080
if (local) {
8181
atomic_set(&local->usage, 1);
82+
atomic_set(&local->active_users, 1);
8283
local->rxnet = rxnet;
8384
INIT_LIST_HEAD(&local->link);
8485
INIT_WORK(&local->processor, rxrpc_local_processor);
@@ -266,11 +267,8 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
266267
* bind the transport socket may still fail if we're attempting
267268
* to use a local address that the dying object is still using.
268269
*/
269-
if (!rxrpc_get_local_maybe(local)) {
270-
cursor = cursor->next;
271-
list_del_init(&local->link);
270+
if (!rxrpc_use_local(local))
272271
break;
273-
}
274272

275273
age = "old";
276274
goto found;
@@ -284,7 +282,10 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
284282
if (ret < 0)
285283
goto sock_error;
286284

287-
list_add_tail(&local->link, cursor);
285+
if (cursor != &rxnet->local_endpoints)
286+
list_replace(cursor, &local->link);
287+
else
288+
list_add_tail(&local->link, cursor);
288289
age = "new";
289290

290291
found:
@@ -342,7 +343,8 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
342343
}
343344

344345
/*
345-
* Queue a local endpoint.
346+
* Queue a local endpoint unless it has become unreferenced and pass the
347+
* caller's reference to the work item.
346348
*/
347349
void rxrpc_queue_local(struct rxrpc_local *local)
348350
{
@@ -351,15 +353,8 @@ void rxrpc_queue_local(struct rxrpc_local *local)
351353
if (rxrpc_queue_work(&local->processor))
352354
trace_rxrpc_local(local, rxrpc_local_queued,
353355
atomic_read(&local->usage), here);
354-
}
355-
356-
/*
357-
* A local endpoint reached its end of life.
358-
*/
359-
static void __rxrpc_put_local(struct rxrpc_local *local)
360-
{
361-
_enter("%d", local->debug_id);
362-
rxrpc_queue_work(&local->processor);
356+
else
357+
rxrpc_put_local(local);
363358
}
364359

365360
/*
@@ -375,10 +370,45 @@ void rxrpc_put_local(struct rxrpc_local *local)
375370
trace_rxrpc_local(local, rxrpc_local_put, n, here);
376371

377372
if (n == 0)
378-
__rxrpc_put_local(local);
373+
call_rcu(&local->rcu, rxrpc_local_rcu);
379374
}
380375
}
381376

377+
/*
378+
* Start using a local endpoint.
379+
*/
380+
struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local)
381+
{
382+
unsigned int au;
383+
384+
local = rxrpc_get_local_maybe(local);
385+
if (!local)
386+
return NULL;
387+
388+
au = atomic_fetch_add_unless(&local->active_users, 1, 0);
389+
if (au == 0) {
390+
rxrpc_put_local(local);
391+
return NULL;
392+
}
393+
394+
return local;
395+
}
396+
397+
/*
398+
* Cease using a local endpoint. Once the number of active users reaches 0, we
399+
* start the closure of the transport in the work processor.
400+
*/
401+
void rxrpc_unuse_local(struct rxrpc_local *local)
402+
{
403+
unsigned int au;
404+
405+
au = atomic_dec_return(&local->active_users);
406+
if (au == 0)
407+
rxrpc_queue_local(local);
408+
else
409+
rxrpc_put_local(local);
410+
}
411+
382412
/*
383413
* Destroy a local endpoint's socket and then hand the record to RCU to dispose
384414
* of.
@@ -393,16 +423,6 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
393423

394424
_enter("%d", local->debug_id);
395425

396-
/* We can get a race between an incoming call packet queueing the
397-
* processor again and the work processor starting the destruction
398-
* process which will shut down the UDP socket.
399-
*/
400-
if (local->dead) {
401-
_leave(" [already dead]");
402-
return;
403-
}
404-
local->dead = true;
405-
406426
mutex_lock(&rxnet->local_mutex);
407427
list_del_init(&local->link);
408428
mutex_unlock(&rxnet->local_mutex);
@@ -422,13 +442,11 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
422442
*/
423443
rxrpc_purge_queue(&local->reject_queue);
424444
rxrpc_purge_queue(&local->event_queue);
425-
426-
_debug("rcu local %d", local->debug_id);
427-
call_rcu(&local->rcu, rxrpc_local_rcu);
428445
}
429446

430447
/*
431-
* Process events on an endpoint
448+
* Process events on an endpoint. The work item carries a ref which
449+
* we must release.
432450
*/
433451
static void rxrpc_local_processor(struct work_struct *work)
434452
{
@@ -441,8 +459,10 @@ static void rxrpc_local_processor(struct work_struct *work)
441459

442460
do {
443461
again = false;
444-
if (atomic_read(&local->usage) == 0)
445-
return rxrpc_local_destroyer(local);
462+
if (atomic_read(&local->active_users) == 0) {
463+
rxrpc_local_destroyer(local);
464+
break;
465+
}
446466

447467
if (!skb_queue_empty(&local->reject_queue)) {
448468
rxrpc_reject_packets(local);
@@ -454,6 +474,8 @@ static void rxrpc_local_processor(struct work_struct *work)
454474
again = true;
455475
}
456476
} while (again);
477+
478+
rxrpc_put_local(local);
457479
}
458480

459481
/*

0 commit comments

Comments
 (0)