Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat rxd av #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions prov/rxd/src/rxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#define RXD_BUF_POOL_ALIGNMENT 16
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_PEER_POOL_CHUNK_CNT 1024
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

Expand Down Expand Up @@ -120,6 +121,8 @@ struct rxd_domain {
struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
fi_addr_t fi_addr;
fi_addr_t dg_addr;
uint64_t tx_seq_no;
uint64_t rx_seq_no;
uint64_t last_rx_ack;
Expand All @@ -142,23 +145,18 @@ struct rxd_peer {
struct dlist_entry buf_pkts;
};

struct rxd_addr {
fi_addr_t fi_addr;
struct rxd_dgaddr_entry {
UT_hash_handle hh;
fi_addr_t dg_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
struct ofi_rbmap rbmap;
int fi_addr_idx;
int rxd_addr_idx;

int dg_av_used;
size_t dg_addrlen;

fi_addr_t *fi_addr_table;
struct rxd_addr *rxd_addr_table;
struct rxd_dgaddr_entry *ep_dgaddr_hash;
};

struct rxd_cq;
Expand All @@ -172,6 +170,7 @@ struct rxd_cq {
enum rxd_pool_type {
RXD_BUF_POOL_RX,
RXD_BUF_POOL_TX,
RXD_BUF_POOL_PEERS
};

struct rxd_buf_pool {
Expand All @@ -180,6 +179,17 @@ struct rxd_buf_pool {
struct rxd_ep *rxd_ep;
};

struct rxd_fiaddr_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
fi_addr_t fi_addr;
};
struct rxd_epname_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_ep {
struct util_ep util_ep;
struct fid_ep *dg_ep;
Expand Down Expand Up @@ -216,7 +226,9 @@ struct rxd_ep {
struct dlist_entry rts_sent_list;
struct dlist_entry ctrl_pkts;

struct rxd_peer peers[];
struct rxd_fiaddr_entry *fi_rxdaddr_hash;
struct rxd_epname_entry *ep_rxdaddr_hash;
struct rxd_buf_pool peer_pool;
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand All @@ -239,6 +251,13 @@ static inline struct rxd_cq *rxd_ep_rx_cq(struct rxd_ep *ep)
return container_of(ep->util_ep.rx_cq, struct rxd_cq, util_cq);
}

static inline fi_addr_t rxd_fiaddr_hash_lookup(struct rxd_ep *ep, fi_addr_t fi_addr)
{
struct rxd_fiaddr_entry *entry = NULL;
HASH_FIND(hh, ep->fi_rxdaddr_hash, (void*)&fi_addr, sizeof(fi_addr), entry);
return entry ? entry->rxd_addr : FI_ADDR_UNSPEC;
}

struct rxd_x_entry {
fi_addr_t peer;
uint16_t tx_id;
Expand Down Expand Up @@ -329,7 +348,6 @@ static inline uint64_t rxd_set_pkt_seq(struct rxd_peer *peer,
struct rxd_pkt_entry *pkt_entry)
{
rxd_get_base_hdr(pkt_entry)->seq_no = peer->tx_seq_no++;

return rxd_get_base_hdr(pkt_entry)->seq_no;
}

Expand Down Expand Up @@ -418,8 +436,18 @@ int rxd_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,

/* AV sub-functions */
int rxd_av_insert_dg_addr(struct rxd_av *av, const void *addr,
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context);
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context, size_t addrlen);

/* AV/Peer subfunctions */
struct rxd_peer *rxd_get_peer_by_epaddr(struct rxd_ep *ep, const void *addr,
size_t addrlen);
struct rxd_peer *rxd_get_peer_by_fiaddr(struct rxd_ep *ep, fi_addr_t fi_addr,
fi_addr_t *rxd_addr);
int rxd_map_av_to_ep(struct util_av *util_av, void *dg_addr, fi_addr_t fi_addr,
void *arg);
int rxd_fiaddr_hash_insert(struct rxd_ep *ep, fi_addr_t *fi_addr,
fi_addr_t *rxd_addr);

/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
Expand All @@ -431,7 +459,7 @@ int rxd_ep_send_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_ep_post_data_pkts(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_insert_unacked(struct rxd_ep *ep, fi_addr_t peer,
struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, fi_addr_t rxd_addr);
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, struct rxd_peer *peer_entry);
int rxd_start_xfer(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_init_data_pkt(struct rxd_ep *ep, struct rxd_x_entry *tx_entry,
struct rxd_pkt_entry *pkt_entry);
Expand All @@ -447,8 +475,7 @@ void rxd_init_atom_hdr(void **ptr, enum fi_datatype datatype,
enum fi_op atomic_op);
size_t rxd_init_msg(void **ptr, const struct iovec *iov, size_t iov_count,
size_t total_len, size_t avail_len);
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry,
size_t *max_inline)
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry, size_t *max_inline)
{
if (tx_entry->flags & RXD_REMOTE_CQ_DATA) {
rxd_init_data_hdr(ptr, tx_entry);
Expand Down
21 changes: 14 additions & 7 deletions prov/rxd/src/rxd_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
struct fi_rma_iov rma_iov[RXD_IOV_LIMIT];
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

assert(count <= RXD_IOV_LIMIT);
assert(rma_count <= RXD_IOV_LIMIT);
Expand All @@ -134,8 +135,11 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
peer_entry = rxd_get_peer_by_fiaddr(rxd_ep, addr, &rxd_addr);
if(!peer_entry)
goto out;

ret = rxd_send_rts_if_needed(rxd_ep, peer_entry);
if (ret)
goto out;

Expand All @@ -145,7 +149,7 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr != FI_ADDR_UNSPEC)
if (peer_entry->peer_addr != FI_ADDR_UNSPEC)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, would it be possible to merge this with the rxd_fiaddr_has_lookup call? Ie

peer_entry = rxd_fiaddr_get_peer(rxd_ep, addr);
if (!peer_entry)
    goto out;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently rxd_fiaddr_hash_lookup returns rxd_addr. In some cases we call other functions like rxd_send_rts_if_needed or rxd_rx_entry_init(rxd_msg.c) or rxd_tx_entry_init_rma( rxd_rma.c) which does not use the peer_entry but just the rxd_addr as input. so we will still need to call the rxd_fiaddr_hash_lookup separately.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rxd_fiaddr_hash_lookup should only need to be called in calls where the app should have already inserted the address into the AV and therefore initialized the peer so we should be able to merge these calls. If the lookup fails and we can't return a peer, we will never get to rxd_send_rts_if_needed or rxd_tx_entry_init_rma.
Same thing with the rx path - if the address is not ADDR_UNSPEC, then it should have been inserted into the AV so it should be able to return the peer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this comment is for all those places where ofi_bufpool_get_ibuf is being called after a rxd_fiaddr_hash_lookup right. not for every call for ofi_bufpool_get_ibuf when we are not doing a fi_addr-> rxd_addr lookup.

(void) rxd_start_xfer(rxd_ep, tx_entry);

out:
Expand Down Expand Up @@ -221,6 +225,7 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
struct fi_rma_iov rma_iov;
fi_addr_t rxd_addr;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two function arguments fi_addr_t dest_addr and uint64_t addr, the dest_addr argument is not being used. the man pages give addr as destination for remote memory while dest_addr as dest address for connectionless operations. what is the difference between the two. are they being used correctly in this function.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dest_addr refers to the peer. It's the same as the fi_addr.
The addr refers to the memory address where the data is going. This can either be an offset in bytes (starting at 0) or the virtual address if the app/provider is using FI_MR_VIRT_ADDR.
You're right - this is incorrect. The lookup should be using the dest_addr, not the addr. The reason this is probably working is because most apps will be using offsets and typically start at 0 which will always exist.
Please fix this and all other errors in a separate patch. Good catch!

ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

iov.iov_base = (void *) buf;
iov.iov_len = count * ofi_datatype_size(datatype);
Expand All @@ -235,8 +240,11 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[dest_addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
peer_entry = rxd_get_peer_by_fiaddr(rxd_ep, addr, &rxd_addr);
if(!peer_entry)
goto out;

ret = rxd_send_rts_if_needed(rxd_ep, peer_entry);
if (ret)
goto out;

Expand All @@ -245,8 +253,7 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
0, NULL, 0, datatype, op);
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr == FI_ADDR_UNSPEC)
if (peer_entry->peer_addr == FI_ADDR_UNSPEC)
goto out;

(void) rxd_start_xfer(rxd_ep, tx_entry);
Expand Down
Loading