Skip to content

Commit

Permalink
implementing dynamic av
Browse files Browse the repository at this point in the history
Added data structures
- added peer_pool fi_rxdaddr_hash and
ep_rxdaddr_hash to struct rxd_ep.
- removed fi_addr_table, rxd_addr_table
rbmap from struct rxd_av

functions modified in
rxd_av.c:rxd_av_insert, rxd_av_insert_dg_addr
rxd_av_insert_fi_addr,rxd_av_remove, rxd_av_close.

rxd_ep.c: rxd_ep_close, rxd_ep_bind, rxd_ep_free_res
rxd_endpoint. Other functions which use peer_pool instead
of ep->peers array.

newly added functions:
rxd_update_av_peers
rxd_fiaddr_hash_insert
rxd_epaddr_hash_insert
rxd_get_peer_entry

multiple functions in rxd_msg.c, rxd_atomic.c
rxd_rma.c

header declrations in rxd.h
  • Loading branch information
nikhilnanal committed Apr 20, 2020
1 parent ce87272 commit 30d6cb0
Show file tree
Hide file tree
Showing 7 changed files with 622 additions and 309 deletions.
48 changes: 40 additions & 8 deletions prov/rxd/src/rxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#define RXD_BUF_POOL_ALIGNMENT 16
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_PEER_POOL_CHUNK_CNT 1024
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

Expand Down Expand Up @@ -120,6 +121,9 @@ struct rxd_domain {
struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
// new entry
fi_addr_t fi_addr;
fi_addr_t dg_addr;
uint64_t tx_seq_no;
uint64_t rx_seq_no;
uint64_t last_rx_ack;
Expand Down Expand Up @@ -148,17 +152,12 @@ struct rxd_addr {
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
struct ofi_rbmap rbmap;
int fi_addr_idx;
int rxd_addr_idx;

struct util_av util_av;
struct fid_av *dg_av;
int dg_av_used;
size_t dg_addrlen;

fi_addr_t *fi_addr_table;
struct rxd_addr *rxd_addr_table;
};

struct rxd_cq;
Expand All @@ -172,6 +171,7 @@ struct rxd_cq {
enum rxd_pool_type {
RXD_BUF_POOL_RX,
RXD_BUF_POOL_TX,
RXD_BUF_POOL_PEERS
};

struct rxd_buf_pool {
Expand All @@ -180,6 +180,19 @@ struct rxd_buf_pool {
struct rxd_ep *rxd_ep;
};

struct rxd_fiaddr_entry {

fi_addr_t fi_addr; //key
fi_addr_t rxd_addr; //value
UT_hash_handle hh;
};
struct rxd_epname_entry {

uint8_t addr[RXD_NAME_LENGTH]; //key
fi_addr_t rxd_addr; //value
UT_hash_handle hh;
};

struct rxd_ep {
struct util_ep util_ep;
struct fid_ep *dg_ep;
Expand Down Expand Up @@ -215,8 +228,13 @@ struct rxd_ep {
struct dlist_entry active_peers;
struct dlist_entry rts_sent_list;
struct dlist_entry ctrl_pkts;
// new
struct rxd_fiaddr_entry *fi_rxdaddr_hash;
struct rxd_epname_entry *ep_rxdaddr_hash;
struct rxd_buf_pool peer_pool;
// struct ofi_rbmap rbmap;

struct rxd_peer peers[];
//struct rxd_peer peers[];
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand Down Expand Up @@ -420,6 +438,20 @@ int rxd_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
int rxd_av_insert_dg_addr(struct rxd_av *av, const void *addr,
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context);
int rxd_av_insert_fi_addr(struct rxd_av *av, fi_addr_t *dg_addr,
fi_addr_t *fi_addr, uint64_t flags);

/* AV/Peer subfunctions */
struct rxd_peer* rxd_get_peer_entry(struct rxd_ep *ep, void *addr);
int rxd_fiaddr_hash_insert(struct rxd_ep *ep, fi_addr_t *fi_addr,
fi_addr_t *rxd_addr);
int rxd_epaddr_hash_insert(struct rxd_ep *ep, void *addr,
fi_addr_t *rxd_addr);
int rxd_update_peers(struct util_av *util_av, void *dg_addr,
fi_addr_t fi_addr, void *arg);




/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
Expand Down
22 changes: 17 additions & 5 deletions prov/rxd/src/rxd_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
struct fi_rma_iov rma_iov[RXD_IOV_LIMIT];
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_fiaddr_entry *entry;
struct rxd_peer *peer_entry;

assert(count <= RXD_IOV_LIMIT);
assert(rma_count <= RXD_IOV_LIMIT);
Expand All @@ -133,8 +135,11 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,

if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;
HASH_FIND(hh, rxd_ep->fi_rxdaddr_hash, (void*)&addr, sizeof(fi_addr_t), entry);
if (entry == NULL)
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
rxd_addr = entry->rxd_addr;
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
if (ret)
goto out;
Expand All @@ -145,7 +150,8 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr != FI_ADDR_UNSPEC)
peer_entry = ofi_bufpool_get_ibuf(rxd_ep->peer_pool.pool, rxd_addr);
if (peer_entry->peer_addr != FI_ADDR_UNSPEC)
(void) rxd_start_xfer(rxd_ep, tx_entry);

out:
Expand Down Expand Up @@ -221,6 +227,8 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
struct fi_rma_iov rma_iov;
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_fiaddr_entry *entry;
struct rxd_peer *peer_entry;

iov.iov_base = (void *) buf;
iov.iov_len = count * ofi_datatype_size(datatype);
Expand All @@ -234,8 +242,12 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,

if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

HASH_FIND(hh, rxd_ep->fi_rxdaddr_hash, (void*)&addr, sizeof(fi_addr_t), entry);
if (entry == NULL)
goto out;
rxd_addr = entry->rxd_addr;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
if (ret)
goto out;
Expand All @@ -245,8 +257,8 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
0, NULL, 0, datatype, op);
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr == FI_ADDR_UNSPEC)
peer_entry = ofi_bufpool_get_ibuf(rxd_ep->peer_pool.pool, rxd_addr);
if (peer_entry->peer_addr == FI_ADDR_UNSPEC)
goto out;

(void) rxd_start_xfer(rxd_ep, tx_entry);
Expand Down
Loading

0 comments on commit 30d6cb0

Please sign in to comment.