Skip to content

Commit

Permalink
xsk: add support for AF_XDP multi-buffer on Tx path
Browse files Browse the repository at this point in the history
For transmitting an AF_XDP packet, allocate skb while processing the
first desc and copy data to it. The 'XDP_PKT_CONTD' flag in 'options'
field of the desc indicates the EOP status of the packet. If the current
desc is not EOP, store the skb, release the current desc and go
on to read the next descs.

Allocate a page for each subsequent desc, copy data to it and add it as
a frag in the skb stored in xsk. On processing EOP, transmit the skb
with frags. Addresses contained in descs have been already queued in
consumer queue and skb destructor updated the completion count.

On transmit failure cancel the releases, clear the descs from the
completion queue and consume the skb for retrying packet transmission.

For any invalid descriptor (invalid length/address/options) in the middle
of a packet, all pending descriptors will be dropped by xsk core along
with the invalid one and the next descriptor is treated as the start of
a new packet.

Maximum supported frames for a packet is MAX_SKB_FRAGS + 1. If it is
exceeded, all descriptors accumulated so far are dropped.

Signed-off-by: Tirthendu Sarkar <[email protected]>
Link: https://lore.kernel.org/r/[email protected]
Signed-off-by: Alexei Starovoitov <[email protected]>
  • Loading branch information
tirthendu-intel authored and Alexei Starovoitov committed Jul 19, 2023
1 parent 1b725b0 commit cf24f5a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 33 deletions.
120 changes: 92 additions & 28 deletions net/xdp/xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
rcu_read_lock();
list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
if (!xskq_cons_peek_desc(xs->tx, desc, pool)) {
xs->tx->queue_empty_descs++;
if (xskq_has_descs(xs->tx))
xskq_cons_release(xs->tx);
continue;
}

Expand Down Expand Up @@ -539,24 +540,32 @@ static void xsk_consume_skb(struct sk_buff *skb)
xs->skb = NULL;
}

static void xsk_drop_skb(struct sk_buff *skb)
{
xdp_sk(skb->sk)->tx->invalid_descs += xsk_get_num_desc(skb);
xsk_consume_skb(skb);
}

static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
struct xdp_desc *desc)
{
struct xsk_buff_pool *pool = xs->pool;
u32 hr, len, ts, offset, copy, copied;
struct sk_buff *skb;
struct sk_buff *skb = xs->skb;
struct page *page;
void *buffer;
int err, i;
u64 addr;

hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));
if (!skb) {
hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));

skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
if (unlikely(!skb))
return ERR_PTR(err);
skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
if (unlikely(!skb))
return ERR_PTR(err);

skb_reserve(skb, hr);
skb_reserve(skb, hr);
}

addr = desc->addr;
len = desc->len;
Expand All @@ -566,7 +575,10 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
offset = offset_in_page(buffer);
addr = buffer - pool->addrs;

for (copied = 0, i = 0; copied < len; i++) {
for (copied = 0, i = skb_shinfo(skb)->nr_frags; copied < len; i++) {
if (unlikely(i >= MAX_SKB_FRAGS))
return ERR_PTR(-EFAULT);

page = pool->umem->pgs[addr >> PAGE_SHIFT];
get_page(page);

Expand All @@ -591,33 +603,56 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
struct xdp_desc *desc)
{
struct net_device *dev = xs->dev;
struct sk_buff *skb;
struct sk_buff *skb = xs->skb;
int err;

if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
skb = xsk_build_skb_zerocopy(xs, desc);
if (IS_ERR(skb))
return skb;
if (IS_ERR(skb)) {
err = PTR_ERR(skb);
goto free_err;
}
} else {
u32 hr, tr, len;
void *buffer;
int err;

hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
tr = dev->needed_tailroom;
buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
len = desc->len;

skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
if (unlikely(!skb))
return ERR_PTR(err);
if (!skb) {
hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
tr = dev->needed_tailroom;
skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
if (unlikely(!skb))
goto free_err;

skb_reserve(skb, hr);
skb_put(skb, len);
skb_reserve(skb, hr);
skb_put(skb, len);

buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err)) {
kfree_skb(skb);
return ERR_PTR(err);
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err))
goto free_err;
} else {
int nr_frags = skb_shinfo(skb)->nr_frags;
struct page *page;
u8 *vaddr;

if (unlikely(nr_frags == (MAX_SKB_FRAGS - 1) && xp_mb_desc(desc))) {
err = -EFAULT;
goto free_err;
}

page = alloc_page(xs->sk.sk_allocation);
if (unlikely(!page)) {
err = -EAGAIN;
goto free_err;
}

vaddr = kmap_local_page(page);
memcpy(vaddr, buffer, len);
kunmap_local(vaddr);

skb_add_rx_frag(skb, nr_frags, page, 0, len, 0);
}
}

Expand All @@ -628,6 +663,17 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
xsk_set_destructor_arg(skb);

return skb;

free_err:
if (err == -EAGAIN) {
xsk_cq_cancel_locked(xs, 1);
} else {
xsk_set_destructor_arg(skb);
xsk_drop_skb(skb);
xskq_cons_release(xs->tx);
}

return ERR_PTR(err);
}

static int __xsk_generic_xmit(struct sock *sk)
Expand Down Expand Up @@ -667,30 +713,45 @@ static int __xsk_generic_xmit(struct sock *sk)
skb = xsk_build_skb(xs, &desc);
if (IS_ERR(skb)) {
err = PTR_ERR(skb);
xsk_cq_cancel_locked(xs, 1);
goto out;
if (err == -EAGAIN)
goto out;
err = 0;
continue;
}

xskq_cons_release(xs->tx);

if (xp_mb_desc(&desc)) {
xs->skb = skb;
continue;
}

err = __dev_direct_xmit(skb, xs->queue_id);
if (err == NETDEV_TX_BUSY) {
/* Tell user-space to retry the send */
xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb));
xsk_consume_skb(skb);
err = -EAGAIN;
goto out;
}

xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP) {
/* SKB completed but not sent */
err = -EBUSY;
xs->skb = NULL;
goto out;
}

sent_frame = true;
xs->skb = NULL;
}

xs->tx->queue_empty_descs++;
if (xskq_has_descs(xs->tx)) {
if (xs->skb)
xsk_drop_skb(xs->skb);
xskq_cons_release(xs->tx);
}

out:
if (sent_frame)
Expand Down Expand Up @@ -940,6 +1001,9 @@ static int xsk_release(struct socket *sock)

net = sock_net(sk);

if (xs->skb)
xsk_drop_skb(xs->skb);

mutex_lock(&net->xdp.lock);
sk_del_node_init_rcu(sk);
mutex_unlock(&net->xdp.lock);
Expand Down
13 changes: 8 additions & 5 deletions net/xdp/xsk_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ static inline bool xp_validate_desc(struct xsk_buff_pool *pool,
xp_aligned_validate_desc(pool, desc);
}

static inline bool xskq_has_descs(struct xsk_queue *q)
{
return q->cached_cons != q->cached_prod;
}

static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
struct xdp_desc *d,
struct xsk_buff_pool *pool)
Expand All @@ -190,17 +195,15 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xsk_buff_pool *pool)
{
while (q->cached_cons != q->cached_prod) {
if (q->cached_cons != q->cached_prod) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
u32 idx = q->cached_cons & q->ring_mask;

*desc = ring->desc[idx];
if (xskq_cons_is_valid_desc(q, desc, pool))
return true;

q->cached_cons++;
return xskq_cons_is_valid_desc(q, desc, pool);
}

q->queue_empty_descs++;
return false;
}

Expand Down

0 comments on commit cf24f5a

Please sign in to comment.