Skip to content

Commit

Permalink
implementing dynamic av
Browse files Browse the repository at this point in the history
Added data structures
- added peer_pool fi_rxdaddr_hash and
ep_rxdaddr_hash to struct rxd_ep.
- removed fi_addr_table, rxd_addr_table
rbmap from struct rxd_av

functions modified in
rxd_av.c:rxd_av_insert, rxd_av_insert_dg_addr
rxd_av_insert_fi_addr,rxd_av_remove, rxd_av_close.

rxd_ep.c: rxd_ep_close, rxd_ep_bind, rxd_ep_free_res
rxd_endpoint. Other functions which use peer_pool instead
of ep->peers array.

newly added functions:
rxd_update_av_peers
rxd_fiaddr_hash_insert
rxd_epaddr_hash_insert
rxd_get_peer_entry

multiple functions in rxd_msg.c, rxd_atomic.c
rxd_rma.c

header declrations in rxd.h
  • Loading branch information
nikhilnanal committed May 6, 2020
1 parent be535bd commit f75d3c1
Show file tree
Hide file tree
Showing 7 changed files with 549 additions and 338 deletions.
59 changes: 43 additions & 16 deletions prov/rxd/src/rxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#define RXD_BUF_POOL_ALIGNMENT 16
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_PEER_POOL_CHUNK_CNT 1024
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

Expand Down Expand Up @@ -120,6 +121,8 @@ struct rxd_domain {
struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
fi_addr_t fi_addr;
fi_addr_t dg_addr;
uint64_t tx_seq_no;
uint64_t rx_seq_no;
uint64_t last_rx_ack;
Expand All @@ -142,23 +145,18 @@ struct rxd_peer {
struct dlist_entry buf_pkts;
};

struct rxd_addr {
fi_addr_t fi_addr;
struct rxd_dgaddr_entry {
UT_hash_handle hh;
fi_addr_t dg_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
struct ofi_rbmap rbmap;
int fi_addr_idx;
int rxd_addr_idx;

int dg_av_used;
size_t dg_addrlen;

fi_addr_t *fi_addr_table;
struct rxd_addr *rxd_addr_table;
struct rxd_dgaddr_entry *ep_dgaddr_hash;
};

struct rxd_cq;
Expand All @@ -172,6 +170,7 @@ struct rxd_cq {
enum rxd_pool_type {
RXD_BUF_POOL_RX,
RXD_BUF_POOL_TX,
RXD_BUF_POOL_PEERS
};

struct rxd_buf_pool {
Expand All @@ -180,6 +179,17 @@ struct rxd_buf_pool {
struct rxd_ep *rxd_ep;
};

struct rxd_fiaddr_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
fi_addr_t fi_addr;
};
struct rxd_epname_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_ep {
struct util_ep util_ep;
struct fid_ep *dg_ep;
Expand Down Expand Up @@ -216,7 +226,9 @@ struct rxd_ep {
struct dlist_entry rts_sent_list;
struct dlist_entry ctrl_pkts;

struct rxd_peer peers[];
struct rxd_fiaddr_entry *fi_rxdaddr_hash;
struct rxd_epname_entry *ep_rxdaddr_hash;
struct rxd_buf_pool peer_pool;
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand All @@ -239,6 +251,13 @@ static inline struct rxd_cq *rxd_ep_rx_cq(struct rxd_ep *ep)
return container_of(ep->util_ep.rx_cq, struct rxd_cq, util_cq);
}

static inline fi_addr_t rxd_fiaddr_hash_lookup(struct rxd_ep *ep, fi_addr_t fi_addr)
{
struct rxd_fiaddr_entry *entry = NULL;
HASH_FIND(hh, ep->fi_rxdaddr_hash, (void*)&fi_addr, sizeof(fi_addr), entry);
return entry ? entry->rxd_addr : FI_ADDR_UNSPEC;
}

struct rxd_x_entry {
fi_addr_t peer;
uint16_t tx_id;
Expand Down Expand Up @@ -329,7 +348,6 @@ static inline uint64_t rxd_set_pkt_seq(struct rxd_peer *peer,
struct rxd_pkt_entry *pkt_entry)
{
rxd_get_base_hdr(pkt_entry)->seq_no = peer->tx_seq_no++;

return rxd_get_base_hdr(pkt_entry)->seq_no;
}

Expand Down Expand Up @@ -418,8 +436,18 @@ int rxd_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,

/* AV sub-functions */
int rxd_av_insert_dg_addr(struct rxd_av *av, const void *addr,
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context);
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context, size_t addrlen);

/* AV/Peer subfunctions */
struct rxd_peer *rxd_get_peer_by_epaddr(struct rxd_ep *ep, const void *addr,
size_t addrlen);
struct rxd_peer *rxd_get_peer_by_fiaddr(struct rxd_ep *ep, fi_addr_t fi_addr,
fi_addr_t *rxd_addr);
int rxd_map_av_to_ep(struct util_av *util_av, void *dg_addr, fi_addr_t fi_addr,
void *arg);
int rxd_fiaddr_hash_insert(struct rxd_ep *ep, fi_addr_t *fi_addr,
fi_addr_t *rxd_addr);

/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
Expand All @@ -431,7 +459,7 @@ int rxd_ep_send_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_ep_post_data_pkts(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_insert_unacked(struct rxd_ep *ep, fi_addr_t peer,
struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, fi_addr_t rxd_addr);
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, struct rxd_peer *peer_entry);
int rxd_start_xfer(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_init_data_pkt(struct rxd_ep *ep, struct rxd_x_entry *tx_entry,
struct rxd_pkt_entry *pkt_entry);
Expand All @@ -447,8 +475,7 @@ void rxd_init_atom_hdr(void **ptr, enum fi_datatype datatype,
enum fi_op atomic_op);
size_t rxd_init_msg(void **ptr, const struct iovec *iov, size_t iov_count,
size_t total_len, size_t avail_len);
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry,
size_t *max_inline)
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry, size_t *max_inline)
{
if (tx_entry->flags & RXD_REMOTE_CQ_DATA) {
rxd_init_data_hdr(ptr, tx_entry);
Expand Down
21 changes: 14 additions & 7 deletions prov/rxd/src/rxd_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
struct fi_rma_iov rma_iov[RXD_IOV_LIMIT];
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

assert(count <= RXD_IOV_LIMIT);
assert(rma_count <= RXD_IOV_LIMIT);
Expand All @@ -134,8 +135,11 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
peer_entry = rxd_get_peer_by_fiaddr(rxd_ep, addr, &rxd_addr);
if(!peer_entry)
goto out;

ret = rxd_send_rts_if_needed(rxd_ep, peer_entry);
if (ret)
goto out;

Expand All @@ -145,7 +149,7 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr != FI_ADDR_UNSPEC)
if (peer_entry->peer_addr != FI_ADDR_UNSPEC)
(void) rxd_start_xfer(rxd_ep, tx_entry);

out:
Expand Down Expand Up @@ -221,6 +225,7 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
struct fi_rma_iov rma_iov;
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

iov.iov_base = (void *) buf;
iov.iov_len = count * ofi_datatype_size(datatype);
Expand All @@ -235,8 +240,11 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[dest_addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
peer_entry = rxd_get_peer_by_fiaddr(rxd_ep, addr, &rxd_addr);
if(!peer_entry)
goto out;

ret = rxd_send_rts_if_needed(rxd_ep, peer_entry);
if (ret)
goto out;

Expand All @@ -245,8 +253,7 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
0, NULL, 0, datatype, op);
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr == FI_ADDR_UNSPEC)
if (peer_entry->peer_addr == FI_ADDR_UNSPEC)
goto out;

(void) rxd_start_xfer(rxd_ep, tx_entry);
Expand Down
Loading

0 comments on commit f75d3c1

Please sign in to comment.