Skip to content

Commit

Permalink
rxd dynamic av Implementation
Browse files Browse the repository at this point in the history
Broadly the dynamic AV implementation replaces the existing statically
allocated arrays used for mapping addresses in rxd_av and the peers
array in rxd_ep with a bufpool based implementation which has inbuilt
mechanism to grow when it is required. Additionally to ensure
uniqueness of the address mappings and bufpool entries the hashtables
based off of the UT hash library have been used. As a result the
mappings between theaddresses that the rxd uses internally has undergone
some changes.

In the struct rxd_av(rxd.h)

- The rxd_av->dg_av is unchanged -used to store the endpoint address (addr)
and maps it to the dg_addr.
- The rxd_av->util_av which was unsused earlier is now used to create
a mapping between the fi_addrs to the dg_addrs. The fi_addrs are
returned to the application as before. This was chosen since this is
already implemented using a bufpool.
(effectivley util_av.pool[fi_addr] = dg_addr)
- The rxd_av->ep_dgaddr_hash is used to ensure that the addr-->dg_addr
entries in the dg_av are unique.
- rxd_av->ofi_rbmap is not used anymore.

In the struct rxd_ep(rxd.h)

- The rxd_ep->peers array is replaced by rxd_ep->peer_pool which is an
indexed bufpool.
- The rxd_ep->ep_rxdaddr_hash is used to create a unique mapping between the
endpoint address (addr) and the peer_entry index(rxd_addr) in the peer_pool
(addr-->rxd_addr) to prevent duplicate insertions in the peer_pool.

- The rxd_ep->fi_rxdaddr_hash is used to create a unique mapping between the
fi_addr-->rxd_addr for fi_addr from the util_av
This is used mainly during transfer operations to fetch a peer when a fi_addr
is provided.

(This hashtable is stored in the rxd_ep because the peer entry can be created
both when an RTS happens as well as when an application tries to insert the
endpoint address. This can result in unique rxd_addr values for different
endpoints for the same fi_addr depending on when the peer_entry was created
in the peer pool).

Address Insertioni/Peer Creation(rxd_av.c/rxd_ep.c/rxd_cq.c)

(rxd_av.c: rxd_av_insert)
The address insertion from the app is handled in rxd_av_insert. Here, for
each addr which is inserted in the dg_av/util_av a peer_entry and a mapping
fi_addr-->rxd_addr (index of the peer_entry) is created for
every endpoint in the rxd_av->util_av.ep_list.

(rxd_ep.c:rxd_ep_bind)
If however, the av was populated before the ep_bind call by the app, the
eplist would be empty. In this case the peer entries and the
ep->fi_rxdaddr_hash mappings are created during the rxd_ep_bind call for
each addr that was inserted in the av->dg_av/av->util_av during
rxd_av_insert.

(rxd_cq.c:rxd_handle_rts)
The peer_entry creation can also happen during an RTS in rxd_handle_rts.
However,the ep->fi_rxdaddr_hash (fi_addr-->rxd_addr) mapping is not created
during an RTS. This mapping is created only when an app wants to insert an
address, i.e. when a fi_addr is created.

Initializations/Deletion/close
rxd_ep intialization changes(rxd_ep.c)

-The ep->peer_pool initilization is done when the rxd_endpoint gets called.
The rxd_init_peer is removed and that code is merged in the newly created
rxd_peer_init_fn which is used in the new rxd_peer_pool_create function.
The RXD_PEER_POOL_CHUNK_CNT is set at 1024, so the peer_pool grows in by this
count whenever the expnsion is needed.
-The rxd_ep_init_res and rxd_ep_free_res are modified to incorporate peer_pool
initialization and free-up.
-free up of hashtable entries in rxd_ep_close
-The rxd_env.max_peers is removed from the code.

rxd_av Initializations/removal/close(rxd_av.c)

rxd_av_create : Removed initializations of statically allocated tables
and the use of rxd_env_max_peers.
rxd_av_remove: The fi_addr-->rxd_addr hashtable entries are removed and free-ed.
However, the peer_pool entries  are not removed in order to ensure that any pending
transfers are not affected.
rxd_av_close: added call to rxd_av_delete_hashtable used to delete entries in
rxd_av->ep_dgaddr_hash. (addr-->dg_addr)

Newly added support functions in rxd. (declarations in rxd.h)
rxd_ep.c:
rxd_get_peer_by_epaddr  : to get a peer from the peer_pool for a given ep addr
rxd_get_peer_by_fiaddr  : to get a peer from the peer_pool for a given fi_addr.
rxd_epaddr_hash_insert  : create addr-->rxd_addr mapping
rxd_fiaddr_hash_insert  : create fi_addr-->rxd_addr mapping.
rxd_fiaddr_hash_lookup  : return rxd_addr for a given fi_addr.
rxd_map_av_to_ep        : to associate addresses inserted in the av to an ep.
rxd_ep_delete_hashtables: to delete hashtables stored in the ep.
rxd_av.c:
rxd_av_delete_hashtable : to delete hashtbale stored in the av.

send/recv/inject functions modified in rxd_msg.c, rxd_rma.c, rxd_atomic.c
to use ep->peer_pool and ep->fi_rxdaddr_hash for transfer operations.

struct rxd_dgaddr_entry, rxd_fiaddr_entry and rxd_epname_entry are
supporting data structures which describe the entries stored in the hashtables.

Signed-off-by: Nikhil Nanal <[email protected]>
  • Loading branch information
nikhilnanal committed May 6, 2020
1 parent be535bd commit af24451
Show file tree
Hide file tree
Showing 7 changed files with 549 additions and 337 deletions.
59 changes: 43 additions & 16 deletions prov/rxd/src/rxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#define RXD_BUF_POOL_ALIGNMENT 16
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_PEER_POOL_CHUNK_CNT 1024
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

Expand Down Expand Up @@ -120,6 +121,8 @@ struct rxd_domain {
struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
fi_addr_t fi_addr;
fi_addr_t dg_addr;
uint64_t tx_seq_no;
uint64_t rx_seq_no;
uint64_t last_rx_ack;
Expand All @@ -142,23 +145,18 @@ struct rxd_peer {
struct dlist_entry buf_pkts;
};

struct rxd_addr {
fi_addr_t fi_addr;
struct rxd_dgaddr_entry {
UT_hash_handle hh;
fi_addr_t dg_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
struct ofi_rbmap rbmap;
int fi_addr_idx;
int rxd_addr_idx;

int dg_av_used;
size_t dg_addrlen;

fi_addr_t *fi_addr_table;
struct rxd_addr *rxd_addr_table;
struct rxd_dgaddr_entry *ep_dgaddr_hash;
};

struct rxd_cq;
Expand All @@ -172,6 +170,7 @@ struct rxd_cq {
enum rxd_pool_type {
RXD_BUF_POOL_RX,
RXD_BUF_POOL_TX,
RXD_BUF_POOL_PEERS
};

struct rxd_buf_pool {
Expand All @@ -180,6 +179,17 @@ struct rxd_buf_pool {
struct rxd_ep *rxd_ep;
};

struct rxd_fiaddr_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
fi_addr_t fi_addr;
};
struct rxd_epname_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_ep {
struct util_ep util_ep;
struct fid_ep *dg_ep;
Expand Down Expand Up @@ -216,7 +226,9 @@ struct rxd_ep {
struct dlist_entry rts_sent_list;
struct dlist_entry ctrl_pkts;

struct rxd_peer peers[];
struct rxd_fiaddr_entry *fi_rxdaddr_hash;
struct rxd_epname_entry *ep_rxdaddr_hash;
struct rxd_buf_pool peer_pool;
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand All @@ -239,6 +251,13 @@ static inline struct rxd_cq *rxd_ep_rx_cq(struct rxd_ep *ep)
return container_of(ep->util_ep.rx_cq, struct rxd_cq, util_cq);
}

static inline fi_addr_t rxd_fiaddr_hash_lookup(struct rxd_ep *ep, fi_addr_t fi_addr)
{
struct rxd_fiaddr_entry *entry = NULL;
HASH_FIND(hh, ep->fi_rxdaddr_hash, (void*)&fi_addr, sizeof(fi_addr), entry);
return entry ? entry->rxd_addr : FI_ADDR_UNSPEC;
}

struct rxd_x_entry {
fi_addr_t peer;
uint16_t tx_id;
Expand Down Expand Up @@ -329,7 +348,6 @@ static inline uint64_t rxd_set_pkt_seq(struct rxd_peer *peer,
struct rxd_pkt_entry *pkt_entry)
{
rxd_get_base_hdr(pkt_entry)->seq_no = peer->tx_seq_no++;

return rxd_get_base_hdr(pkt_entry)->seq_no;
}

Expand Down Expand Up @@ -418,8 +436,18 @@ int rxd_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,

/* AV sub-functions */
int rxd_av_insert_dg_addr(struct rxd_av *av, const void *addr,
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context);
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context, size_t addrlen);

/* AV/Peer subfunctions */
struct rxd_peer *rxd_get_peer_by_epaddr(struct rxd_ep *ep, const void *addr,
size_t addrlen);
struct rxd_peer *rxd_get_peer_by_fiaddr(struct rxd_ep *ep, fi_addr_t fi_addr,
fi_addr_t *rxd_addr);
int rxd_map_av_to_ep(struct util_av *util_av, void *dg_addr, fi_addr_t fi_addr,
void *arg);
int rxd_fiaddr_hash_insert(struct rxd_ep *ep, fi_addr_t *fi_addr,
fi_addr_t *rxd_addr);

/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
Expand All @@ -431,7 +459,7 @@ int rxd_ep_send_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_ep_post_data_pkts(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_insert_unacked(struct rxd_ep *ep, fi_addr_t peer,
struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, fi_addr_t rxd_addr);
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, struct rxd_peer *peer_entry);
int rxd_start_xfer(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_init_data_pkt(struct rxd_ep *ep, struct rxd_x_entry *tx_entry,
struct rxd_pkt_entry *pkt_entry);
Expand All @@ -447,8 +475,7 @@ void rxd_init_atom_hdr(void **ptr, enum fi_datatype datatype,
enum fi_op atomic_op);
size_t rxd_init_msg(void **ptr, const struct iovec *iov, size_t iov_count,
size_t total_len, size_t avail_len);
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry,
size_t *max_inline)
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry, size_t *max_inline)
{
if (tx_entry->flags & RXD_REMOTE_CQ_DATA) {
rxd_init_data_hdr(ptr, tx_entry);
Expand Down
21 changes: 14 additions & 7 deletions prov/rxd/src/rxd_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
struct fi_rma_iov rma_iov[RXD_IOV_LIMIT];
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

assert(count <= RXD_IOV_LIMIT);
assert(rma_count <= RXD_IOV_LIMIT);
Expand All @@ -134,8 +135,11 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
peer_entry = rxd_get_peer_by_fiaddr(rxd_ep, addr, &rxd_addr);
if(!peer_entry)
goto out;

ret = rxd_send_rts_if_needed(rxd_ep, peer_entry);
if (ret)
goto out;

Expand All @@ -145,7 +149,7 @@ static ssize_t rxd_generic_atomic(struct rxd_ep *rxd_ep,
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr != FI_ADDR_UNSPEC)
if (peer_entry->peer_addr != FI_ADDR_UNSPEC)
(void) rxd_start_xfer(rxd_ep, tx_entry);

out:
Expand Down Expand Up @@ -221,6 +225,7 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
struct fi_rma_iov rma_iov;
fi_addr_t rxd_addr;
ssize_t ret = -FI_EAGAIN;
struct rxd_peer *peer_entry;

iov.iov_base = (void *) buf;
iov.iov_len = count * ofi_datatype_size(datatype);
Expand All @@ -235,8 +240,11 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
if (ofi_cirque_isfull(rxd_ep->util_ep.tx_cq->cirq))
goto out;

rxd_addr = rxd_ep_av(rxd_ep)->fi_addr_table[dest_addr];
ret = rxd_send_rts_if_needed(rxd_ep, rxd_addr);
peer_entry = rxd_get_peer_by_fiaddr(rxd_ep, addr, &rxd_addr);
if(!peer_entry)
goto out;

ret = rxd_send_rts_if_needed(rxd_ep, peer_entry);
if (ret)
goto out;

Expand All @@ -245,8 +253,7 @@ static ssize_t rxd_atomic_inject(struct fid_ep *ep_fid, const void *buf,
0, NULL, 0, datatype, op);
if (!tx_entry)
goto out;

if (rxd_ep->peers[rxd_addr].peer_addr == FI_ADDR_UNSPEC)
if (peer_entry->peer_addr == FI_ADDR_UNSPEC)
goto out;

(void) rxd_start_xfer(rxd_ep, tx_entry);
Expand Down
Loading

0 comments on commit af24451

Please sign in to comment.