Skip to content

Commit

Permalink
implementing dynamic av
Browse files Browse the repository at this point in the history
Added data structures
- added peer_pool fi_rxdaddr_hash and
ep_rxdaddr_hash to struct rxd_ep.
- removed fi_addr_table, rxd_addr_table
rbmap from struct rxd_av

functions modified in
rxd_av.c:rxd_av_insert, rxd_av_insert_dg_addr
rxd_av_insert_fi_addr,rxd_av_remove, rxd_av_close.

rxd_ep.c: rxd_ep_close, rxd_ep_bind, rxd_ep_free_res
rxd_endpoint. Other functions which use peer_pool instead
of ep->peers array.

newly added functions:
rxd_update_av_peers
rxd_fiaddr_hash_insert
rxd_epaddr_hash_insert
rxd_get_peer_entry

multiple functions in rxd_msg.c, rxd_atomic.c
rxd_rma.c

header declrations in rxd.h
  • Loading branch information
nikhilnanal committed Apr 20, 2020
1 parent ce87272 commit d6cfcfb
Show file tree
Hide file tree
Showing 7 changed files with 889 additions and 648 deletions.
148 changes: 89 additions & 59 deletions prov/rxd/src/rxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,17 @@
#ifndef _RXD_H_
#define _RXD_H_

#define RXD_PROTOCOL_VERSION (2)
#define RXD_PROTOCOL_VERSION (2)

#define RXD_MAX_MTU_SIZE 4096

#define RXD_MAX_TX_BITS 10
#define RXD_MAX_RX_BITS 10
#define RXD_MAX_TX_BITS 10
#define RXD_MAX_RX_BITS 10

#define RXD_BUF_POOL_ALIGNMENT 16
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_PEER_POOL_CHUNK_CNT 1024
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

Expand Down Expand Up @@ -120,6 +121,8 @@ struct rxd_domain {
struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
fi_addr_t fi_addr;
fi_addr_t dg_addr;
uint64_t tx_seq_no;
uint64_t rx_seq_no;
uint64_t last_rx_ack;
Expand All @@ -142,28 +145,23 @@ struct rxd_peer {
struct dlist_entry buf_pkts;
};

struct rxd_addr {
fi_addr_t fi_addr;
struct rxd_dgaddr_entry {
UT_hash_handle hh;
fi_addr_t dg_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
struct ofi_rbmap rbmap;
int fi_addr_idx;
int rxd_addr_idx;

int dg_av_used;
size_t dg_addrlen;

fi_addr_t *fi_addr_table;
struct rxd_addr *rxd_addr_table;
struct rxd_dgaddr_entry *ep_dgaddr_hash;
};

struct rxd_cq;
typedef int (*rxd_cq_write_fn)(struct rxd_cq *cq,
struct fi_cq_tagged_entry *cq_entry);
struct fi_cq_tagged_entry *cq_entry);
struct rxd_cq {
struct util_cq util_cq;
rxd_cq_write_fn write_fn;
Expand All @@ -172,6 +170,7 @@ struct rxd_cq {
enum rxd_pool_type {
RXD_BUF_POOL_RX,
RXD_BUF_POOL_TX,
RXD_BUF_POOL_PEERS
};

struct rxd_buf_pool {
Expand All @@ -180,6 +179,17 @@ struct rxd_buf_pool {
struct rxd_ep *rxd_ep;
};

struct rxd_fiaddr_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
fi_addr_t fi_addr;
};
struct rxd_epname_entry {
UT_hash_handle hh;
fi_addr_t rxd_addr;
uint8_t addr[RXD_NAME_LENGTH];
};

struct rxd_ep {
struct util_ep util_ep;
struct fid_ep *dg_ep;
Expand Down Expand Up @@ -216,7 +226,9 @@ struct rxd_ep {
struct dlist_entry rts_sent_list;
struct dlist_entry ctrl_pkts;

struct rxd_peer peers[];
struct rxd_fiaddr_entry *fi_rxdaddr_hash;
struct rxd_epname_entry *ep_rxdaddr_hash;
struct rxd_buf_pool peer_pool;
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand All @@ -239,6 +251,13 @@ static inline struct rxd_cq *rxd_ep_rx_cq(struct rxd_ep *ep)
return container_of(ep->util_ep.rx_cq, struct rxd_cq, util_cq);
}

static inline fi_addr_t rxd_fiaddr_hash_lookup(struct rxd_ep *ep, fi_addr_t fi_addr)
{
struct rxd_fiaddr_entry *entry = NULL;
HASH_FIND(hh, ep->fi_rxdaddr_hash, (void*)&fi_addr, sizeof(fi_addr), entry);
return entry ? entry->rxd_addr : FI_ADDR_UNSPEC;
}

struct rxd_x_entry {
fi_addr_t peer;
uint16_t tx_id;
Expand Down Expand Up @@ -326,10 +345,9 @@ static inline struct rxd_base_hdr *rxd_get_base_hdr(struct rxd_pkt_entry *pkt_en
}

static inline uint64_t rxd_set_pkt_seq(struct rxd_peer *peer,
struct rxd_pkt_entry *pkt_entry)
struct rxd_pkt_entry *pkt_entry)
{
rxd_get_base_hdr(pkt_entry)->seq_no = peer->tx_seq_no++;

return rxd_get_base_hdr(pkt_entry)->seq_no;
}

Expand All @@ -341,19 +359,19 @@ static inline struct rxd_ext_hdr *rxd_get_ext_hdr(struct rxd_pkt_entry *pkt_entr
static inline struct rxd_sar_hdr *rxd_get_sar_hdr(struct rxd_pkt_entry *pkt_entry)
{
return (struct rxd_sar_hdr *) ((char *) pkt_entry->pkt +
sizeof(struct rxd_base_hdr));
sizeof(struct rxd_base_hdr));
}

static inline void rxd_set_tx_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry)
{
pkt_entry->pkt = (void *) ((char *) pkt_entry +
sizeof(*pkt_entry) + ep->tx_prefix_size);
sizeof(*pkt_entry) + ep->tx_prefix_size);
}

static inline void rxd_set_rx_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry)
{
pkt_entry->pkt = (void *) ((char *) pkt_entry +
sizeof(*pkt_entry) + ep->rx_prefix_size);
sizeof(*pkt_entry) + ep->rx_prefix_size);
}

static inline void *rxd_pkt_start(struct rxd_pkt_entry *pkt_entry)
Expand All @@ -362,7 +380,7 @@ static inline void *rxd_pkt_start(struct rxd_pkt_entry *pkt_entry)
}

static inline size_t rxd_pkt_size(struct rxd_ep *ep, struct rxd_base_hdr *base_hdr,
void *ptr)
void *ptr)
{
return ((char *) ptr - (char *) base_hdr) + ep->tx_prefix_size;
}
Expand Down Expand Up @@ -397,29 +415,41 @@ static inline int rxd_match_tag(uint64_t tag, uint64_t ignore, uint64_t match_ta
}

int rxd_info_to_core(uint32_t version, const struct fi_info *rxd_info,
struct fi_info *core_info);
struct fi_info *core_info);
int rxd_info_to_rxd(uint32_t version, const struct fi_info *core_info,
struct fi_info *info);
struct fi_info *info);

int rxd_fabric(struct fi_fabric_attr *attr,
struct fid_fabric **fabric, void *context);
struct fid_fabric **fabric, void *context);
int rxd_domain_open(struct fid_fabric *fabric, struct fi_info *info,
struct fid_domain **dom, void *context);
struct fid_domain **dom, void *context);
int rxd_av_create(struct fid_domain *domain_fid, struct fi_av_attr *attr,
struct fid_av **av, void *context);
struct fid_av **av, void *context);
int rxd_endpoint(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep, void *context);
struct fid_ep **ep, void *context);
int rxd_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context);
struct fid_cq **cq_fid, void *context);
int rxd_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
struct fid_cntr **cntr_fid, void *context);
struct fid_cntr **cntr_fid, void *context);
int rxd_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
enum fi_op op, struct fi_atomic_attr *attr, uint64_t flags);
enum fi_op op, struct fi_atomic_attr *attr, uint64_t flags);

/* AV sub-functions */
int rxd_av_insert_dg_addr(struct rxd_av *av, const void *addr,
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context);
fi_addr_t *dg_fiaddr, uint64_t flags,
void *context, size_t addrlen);

/* AV/Peer subfunctions */
struct rxd_peer *rxd_get_peer_entry(struct rxd_ep *ep, const void *addr,
size_t addrlen);
void rxd_fi_update_peer_entry(struct rxd_ep *ep, const void *addr,
fi_addr_t fi_addr, fi_addr_t *dg_addr);
int rxd_fiaddr_hash_insert(struct rxd_ep *ep, fi_addr_t *fi_addr,
fi_addr_t *rxd_addr);
int rxd_epaddr_hash_insert(struct rxd_ep *ep, void *addr,
fi_addr_t *rxd_addr);
int rxd_update_av_peers(struct util_av *util_av, void *dg_addr,
fi_addr_t fi_addr, void *arg);

/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
Expand All @@ -434,22 +464,22 @@ void rxd_insert_unacked(struct rxd_ep *ep, fi_addr_t peer,
ssize_t rxd_send_rts_if_needed(struct rxd_ep *rxd_ep, fi_addr_t rxd_addr);
int rxd_start_xfer(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_init_data_pkt(struct rxd_ep *ep, struct rxd_x_entry *tx_entry,
struct rxd_pkt_entry *pkt_entry);
struct rxd_pkt_entry *pkt_entry);
void rxd_init_base_hdr(struct rxd_ep *rxd_ep, void **ptr,
struct rxd_x_entry *tx_entry);
struct rxd_x_entry *tx_entry);
void rxd_init_sar_hdr(void **ptr, struct rxd_x_entry *tx_entry,
size_t iov_count);
size_t iov_count);
void rxd_init_tag_hdr(void **ptr, struct rxd_x_entry *tx_entry);
void rxd_init_data_hdr(void **ptr, struct rxd_x_entry *tx_entry);
void rxd_init_rma_hdr(void **ptr, const struct fi_rma_iov *rma_iov,
size_t rma_count);
size_t rma_count);
void rxd_init_atom_hdr(void **ptr, enum fi_datatype datatype,
enum fi_op atomic_op);
enum fi_op atomic_op);
size_t rxd_init_msg(void **ptr, const struct iovec *iov, size_t iov_count,
size_t total_len, size_t avail_len);
size_t total_len, size_t avail_len);
static inline void rxd_check_init_cq_data(void **ptr, struct rxd_x_entry *tx_entry,
size_t *max_inline)
{
size_t *max_inline)
{
if (tx_entry->flags & RXD_REMOTE_CQ_DATA) {
rxd_init_data_hdr(ptr, tx_entry);
*max_inline -= sizeof(tx_entry->cq_entry.data);
Expand All @@ -472,38 +502,38 @@ uint64_t rxd_get_retry_time(uint64_t start, uint8_t retry_cnt);

/* Generic message functions */
ssize_t rxd_ep_generic_recvmsg(struct rxd_ep *rxd_ep, const struct iovec *iov,
size_t iov_count, fi_addr_t addr, uint64_t tag,
uint64_t ignore, void *context, uint32_t op,
uint32_t rxd_flags, uint64_t flags);
size_t iov_count, fi_addr_t addr, uint64_t tag,
uint64_t ignore, void *context, uint32_t op,
uint32_t rxd_flags, uint64_t flags);
ssize_t rxd_ep_generic_sendmsg(struct rxd_ep *rxd_ep, const struct iovec *iov,
size_t iov_count, fi_addr_t addr, uint64_t tag,
uint64_t data, void *context, uint32_t op,
uint32_t rxd_flags);
size_t iov_count, fi_addr_t addr, uint64_t tag,
uint64_t data, void *context, uint32_t op,
uint32_t rxd_flags);
ssize_t rxd_ep_generic_inject(struct rxd_ep *rxd_ep, const struct iovec *iov,
size_t iov_count, fi_addr_t addr, uint64_t tag,
uint64_t data, uint32_t op, uint32_t rxd_flags);
size_t iov_count, fi_addr_t addr, uint64_t tag,
uint64_t data, uint32_t op, uint32_t rxd_flags);

/* Progress functions */
void rxd_tx_entry_progress(struct rxd_ep *ep, struct rxd_x_entry *tx_entry,
int try_send);
int try_send);
void rxd_handle_recv_comp(struct rxd_ep *ep, struct fi_cq_msg_entry *comp);
void rxd_handle_send_comp(struct rxd_ep *ep, struct fi_cq_msg_entry *comp);
void rxd_handle_error(struct rxd_ep *ep);
void rxd_progress_op(struct rxd_ep *ep, struct rxd_x_entry *rx_entry,
struct rxd_pkt_entry *pkt_entry,
struct rxd_base_hdr *base_hdr,
struct rxd_sar_hdr *sar_hdr,
struct rxd_tag_hdr *tag_hdr,
struct rxd_data_hdr *data_hdr,
struct rxd_rma_hdr *rma_hdr,
struct rxd_atom_hdr *atom_hdr,
void **msg, size_t size);
struct rxd_pkt_entry *pkt_entry,
struct rxd_base_hdr *base_hdr,
struct rxd_sar_hdr *sar_hdr,
struct rxd_tag_hdr *tag_hdr,
struct rxd_data_hdr *data_hdr,
struct rxd_rma_hdr *rma_hdr,
struct rxd_atom_hdr *atom_hdr,
void **msg, size_t size);
void rxd_ep_recv_data(struct rxd_ep *ep, struct rxd_x_entry *x_entry,
struct rxd_data_pkt *pkt, size_t size);
struct rxd_data_pkt *pkt, size_t size);
void rxd_progress_tx_list(struct rxd_ep *ep, struct rxd_peer *peer);
struct rxd_x_entry *rxd_progress_multi_recv(struct rxd_ep *ep,
struct rxd_x_entry *rx_entry,
size_t total_size);
struct rxd_x_entry *rx_entry,
size_t total_size);
void rxd_ep_progress(struct util_ep *util_ep);
void rxd_cleanup_unexp_msg(struct rxd_unexp_msg *unexp_msg);

Expand Down
Loading

0 comments on commit d6cfcfb

Please sign in to comment.