Skip to content

Commit

Permalink
implementing dynamic av
Browse files Browse the repository at this point in the history
Added data structures
- added peer_pool fi_rxdaddr_hash and
ep_rxdaddr_hash to struct rxd_ep.
- removed fi_addr_table, rxd_addr_table
rbmap from struct rxd_av

functions modified in
rxd_av.c:rxd_av_insert, rxd_av_insert_dg_addr
rxd_av_insert_fi_addr,rxd_av_remove, rxd_av_close.

rxd_ep.c: rxd_ep_close, rxd_ep_bind, rxd_ep_free_res
rxd_endpoint. Other functions which use peer_pool instead
of ep->peers array.

newly added functions:
rxd_update_av_peers
rxd_fiaddr_hash_insert
rxd_epaddr_hash_insert
rxd_get_peer_entry

multiple functions in rxd_msg.c, rxd_atomic.c
rxd_rma.c

header declrations in rxd.h
  • Loading branch information
nikhilnanal committed Apr 20, 2020
1 parent ce87272 commit d0088ea
Show file tree
Hide file tree
Showing 7 changed files with 603 additions and 331 deletions.
61 changes: 47 additions & 14 deletions prov/rxd/src/rxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#define RXD_BUF_POOL_ALIGNMENT 16
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_PEER_POOL_CHUNK_CNT 1024
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

Expand Down Expand Up @@ -119,7 +120,9 @@ struct rxd_domain {

struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
fi_addr_t peer_addr;
fi_addr_t fi_addr;
fi_addr_t dg_addr;
uint64_t tx_seq_no;
uint64_t rx_seq_no;
uint64_t last_rx_ack;
Expand All @@ -142,23 +145,20 @@ struct rxd_peer {
struct dlist_entry buf_pkts;
};

struct rxd_addr {
fi_addr_t fi_addr;
fi_addr_t dg_addr;
struct rxd_dgaddr_entry {

uint8_t addr[RXD_NAME_LENGTH]; //key
fi_addr_t dg_addr; //value
UT_hash_handle hh;
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
struct ofi_rbmap rbmap;
int fi_addr_idx;
int rxd_addr_idx;

struct util_av util_av;
struct fid_av *dg_av;
int dg_av_used;
size_t dg_addrlen;

fi_addr_t *fi_addr_table;
struct rxd_addr *rxd_addr_table;
struct rxd_dgaddr_entry *ep_dgaddr_hash;
};

struct rxd_cq;
Expand All @@ -172,6 +172,7 @@ struct rxd_cq {
enum rxd_pool_type {
RXD_BUF_POOL_RX,
RXD_BUF_POOL_TX,
RXD_BUF_POOL_PEERS
};

struct rxd_buf_pool {
Expand All @@ -180,6 +181,19 @@ struct rxd_buf_pool {
struct rxd_ep *rxd_ep;
};

struct rxd_fiaddr_entry {

fi_addr_t fi_addr; //key
fi_addr_t rxd_addr; //value
UT_hash_handle hh;
};
struct rxd_epname_entry {

uint8_t addr[RXD_NAME_LENGTH]; //key
fi_addr_t rxd_addr; //value
UT_hash_handle hh;
};

struct rxd_ep {
struct util_ep util_ep;
struct fid_ep *dg_ep;
Expand Down Expand Up @@ -216,7 +230,9 @@ struct rxd_ep {
struct dlist_entry rts_sent_list;
struct dlist_entry ctrl_pkts;

struct rxd_peer peers[];
struct rxd_fiaddr_entry *fi_rxdaddr_hash;
struct rxd_epname_entry *ep_rxdaddr_hash;
struct rxd_buf_pool peer_pool;
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand All @@ -239,6 +255,14 @@ static inline struct rxd_cq *rxd_ep_rx_cq(struct rxd_ep *ep)
return container_of(ep->util_ep.rx_cq, struct rxd_cq, util_cq);
}

static inline fi_addr_t rxd_fiaddr_hash_lookup(struct rxd_ep *ep, fi_addr_t fi_addr)
{
struct rxd_fiaddr_entry *entry = NULL;
HASH_FIND(hh, ep->fi_rxdaddr_hash, (void*)&fi_addr, sizeof(fi_addr), entry);
return entry ? entry->rxd_addr : FI_ADDR_UNSPEC;

}

struct rxd_x_entry {
fi_addr_t peer;
uint16_t tx_id;
Expand Down Expand Up @@ -421,6 +445,15 @@ int rxd_av_insert_dg_addr(struct rxd_av *av, const void *addr,
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context);

/* AV/Peer subfunctions */
struct rxd_peer* rxd_get_peer_entry(struct rxd_ep *ep, void *addr);
int rxd_fiaddr_hash_insert(struct rxd_ep *ep, fi_addr_t *fi_addr,
fi_addr_t *rxd_addr);
int rxd_epaddr_hash_insert(struct rxd_ep *ep, void *addr,
fi_addr_t *rxd_addr);
int rxd_update_av_peers(struct util_av *util_av, void *dg_addr,
fi_addr_t fi_addr, void *arg);

/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
void rxd_ep_send_ack(struct rxd_ep *rxd_ep, fi_addr_t peer);
Expand Down Expand Up @@ -464,7 +497,7 @@ struct rxd_x_entry *rxd_tx_entry_init_common(struct rxd_ep *ep, fi_addr_t addr,
struct rxd_x_entry *rxd_rx_entry_init(struct rxd_ep *ep,
const struct iovec *iov, size_t iov_count, uint64_t tag,
uint64_t ignore, void *context, fi_addr_t addr,
uint32_t op, uint32_t flags);
uint32_t op, uint32_t flags);
void rxd_tx_entry_free(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_rx_entry_free(struct rxd_ep *ep, struct rxd_x_entry *rx_entry);
int rxd_get_timeout(uint8_t retry_cnt);
Expand Down
23 changes: 16 additions & 7 deletions prov/rxd/src/rxd_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
struct iovec iov[RXD_IOV_LIMIT], res_iov[RXD_IOV_LIMIT], comp_iov[RXD_IOV_LIMIT];
struct fi_rma_iov rma_iov[RXD_IOV_LIMIT];
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

assert(count <= RXD_IOV_LIMIT);
assert(rma_count <= RXD_IOV_LIMIT);
Expand All @@ -133,8 +134,11 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,

if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_fiaddr_hash_lookup(rxd_ep, addr);
if (rxd_addr == FI_ADDR_UNSPEC)
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
if (ret)
goto out;
Expand All @@ -145,7 +149,8 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr != FI_ADDR_UNSPEC)
peer_entry = ofi_bufpool_get_ibuf(rxd_ep->peer_pool.pool, rxd_addr);
if (peer_entry->peer_addr != FI_ADDR_UNSPEC)
(void) rxd_start_xfer(rxd_ep, tx_entry);

out:
Expand Down Expand Up @@ -220,7 +225,8 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
struct iovec iov;
struct fi_rma_iov rma_iov;
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

iov.iov_base = (void *) buf;
iov.iov_len = count * ofi_datatype_size(datatype);
Expand All @@ -234,8 +240,11 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,

if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_fiaddr_hash_lookup(rxd_ep, addr);
if (rxd_addr == FI_ADDR_UNSPEC)
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
if (ret)
goto out;
Expand All @@ -245,8 +254,8 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
0, NULL, 0, datatype, op);
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr == FI_ADDR_UNSPEC)
peer_entry = ofi_bufpool_get_ibuf(rxd_ep->peer_pool.pool, rxd_addr);
if (peer_entry->peer_addr == FI_ADDR_UNSPEC)
goto out;

(void) rxd_start_xfer(rxd_ep, tx_entry);
Expand Down
Loading

0 comments on commit d0088ea

Please sign in to comment.