Skip to content

Commit

Permalink
Enhance mmap value store with dynamic pre-allocation and robust index…
Browse files Browse the repository at this point in the history
…ing (#260)

Co-authored-by: Jyun-Yu Jiang <[email protected]>
  • Loading branch information
hallogameboy and Jyun-Yu Jiang authored Oct 5, 2023
1 parent b4b817a commit d864e60
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 127 deletions.
34 changes: 17 additions & 17 deletions pecos/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,7 @@ def mmap_hashmap_init(self, map_type):
raise NotImplementedError(f"map_type={map_type} is not implemented.")
return self.mmap_map_fn_dict[map_type]

def _get_num_f32_mmap_valstore_methods(self):
def _get_float32_mmap_valstore_methods(self):
"""
Specify C-lib's numerical float32 Memory-mappable store methods arguments and return types.
"""
Expand All @@ -1823,7 +1823,7 @@ def _get_num_f32_mmap_valstore_methods(self):

fn_name = "n_col"
local_fn_dict[fn_name] = getattr(self.clib_float32, f"{fn_prefix}_{fn_name}_{store_type}")
corelib.fillprototype(local_fn_dict[fn_name], c_uint32, [c_void_p])
corelib.fillprototype(local_fn_dict[fn_name], c_uint64, [c_void_p])

fn_name = "save"
local_fn_dict[fn_name] = getattr(self.clib_float32, f"{fn_prefix}_{fn_name}_{store_type}")
Expand All @@ -1839,30 +1839,30 @@ def _get_num_f32_mmap_valstore_methods(self):
local_fn_dict[fn_name], None, [c_void_p, c_uint64, c_uint32, POINTER(c_float)]
)

fn_name = "get_submatrix"
fn_name = "batch_get"
local_fn_dict[fn_name] = getattr(self.clib_float32, f"{fn_prefix}_{fn_name}_{store_type}")
corelib.fillprototype(
local_fn_dict[fn_name],
None,
[
c_void_p,
c_uint32,
c_uint32,
c_uint64,
c_uint64,
POINTER(c_uint64),
POINTER(c_uint64),
POINTER(c_uint32),
POINTER(c_float),
c_uint32,
],
)

return local_fn_dict

def _get_str_mmap_valstore_methods(self):
def _get_bytes_mmap_valstore_methods(self):
"""
Specify C-lib's numerical Memory-mappable value store methods arguments and return types.
Specify C-lib's bytes Memory-mappable value store methods arguments and return types.
"""
fn_prefix = "mmap_valstore"
store_type = "str"
store_type = "bytes"

local_fn_dict = {}

Expand All @@ -1880,7 +1880,7 @@ def _get_str_mmap_valstore_methods(self):

fn_name = "n_col"
local_fn_dict[fn_name] = getattr(self.clib_float32, f"{fn_prefix}_{fn_name}_{store_type}")
corelib.fillprototype(local_fn_dict[fn_name], c_uint32, [c_void_p])
corelib.fillprototype(local_fn_dict[fn_name], c_uint64, [c_void_p])

fn_name = "save"
local_fn_dict[fn_name] = getattr(self.clib_float32, f"{fn_prefix}_{fn_name}_{store_type}")
Expand All @@ -1895,20 +1895,20 @@ def _get_str_mmap_valstore_methods(self):
corelib.fillprototype(
local_fn_dict[fn_name],
None,
[c_void_p, c_uint64, c_uint32, c_void_p, POINTER(c_uint32)],
[c_void_p, c_uint64, c_uint64, c_void_p, POINTER(c_uint32)],
)

fn_name = "get_submatrix"
fn_name = "batch_get"
local_fn_dict[fn_name] = getattr(self.clib_float32, f"{fn_prefix}_{fn_name}_{store_type}")
corelib.fillprototype(
local_fn_dict[fn_name],
None,
[
c_void_p,
c_uint32, # n_sub_row
c_uint32, # n_sub_col
c_uint64, # n_sub_row
c_uint64, # n_sub_col
POINTER(c_uint64), # sub_rows
POINTER(c_uint32), # sub_cols
POINTER(c_uint64), # sub_cols
c_uint32, # trunc_val_len
c_char_p, # ret
POINTER(c_uint32), # ret_lens
Expand All @@ -1924,8 +1924,8 @@ def link_mmap_valstore_methods(self):
"""

self.mmap_valstore_fn_dict = {
"num_f32": self._get_num_f32_mmap_valstore_methods(),
"str": self._get_str_mmap_valstore_methods(),
"float32": self._get_float32_mmap_valstore_methods(),
"bytes": self._get_bytes_mmap_valstore_methods(),
}

def mmap_valstore_init(self, store_type):
Expand Down
40 changes: 20 additions & 20 deletions pecos/core/libpecos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ extern "C" {
// ==== C Interface of Memory-mappable Value Store ====

typedef pecos::mmap_valstore::Float32Store mmap_valstore_float32;
typedef pecos::mmap_valstore::StringStore mmap_valstore_str;
typedef pecos::mmap_valstore::BytesStore mmap_valstore_bytes;
typedef pecos::mmap_valstore::row_type row_type;
typedef pecos::mmap_valstore::col_type col_type;

Expand All @@ -565,35 +565,35 @@ extern "C" {
void* mmap_valstore_new_ ## SUFFIX () { \
return static_cast<void*>(new mmap_valstore_ ## SUFFIX()); }
MMAP_VALSTORE_NEW(float32)
MMAP_VALSTORE_NEW(str)
MMAP_VALSTORE_NEW(bytes)

// Destruct
#define MMAP_VALSTORE_DESTRUCT(SUFFIX) \
void mmap_valstore_destruct_ ## SUFFIX (void* map_ptr) { \
delete static_cast<mmap_valstore_ ## SUFFIX *>(map_ptr); }
MMAP_VALSTORE_DESTRUCT(float32)
MMAP_VALSTORE_DESTRUCT(str)
MMAP_VALSTORE_DESTRUCT(bytes)

// Number of rows
#define MMAP_MAP_N_ROW(SUFFIX) \
row_type mmap_valstore_n_row_ ## SUFFIX (void* map_ptr) { \
return static_cast<mmap_valstore_ ## SUFFIX *>(map_ptr)->n_row(); }
MMAP_MAP_N_ROW(float32)
MMAP_MAP_N_ROW(str)
MMAP_MAP_N_ROW(bytes)

// Number of columns
#define MMAP_MAP_N_COL(SUFFIX) \
col_type mmap_valstore_n_col_ ## SUFFIX (void* map_ptr) { \
return static_cast<mmap_valstore_ ## SUFFIX *>(map_ptr)->n_col(); }
MMAP_MAP_N_COL(float32)
MMAP_MAP_N_COL(str)
MMAP_MAP_N_COL(bytes)

// Save
#define MMAP_VALSTORE_SAVE(SUFFIX) \
void mmap_valstore_save_ ## SUFFIX (void* map_ptr, const char* map_dir) { \
static_cast<mmap_valstore_ ## SUFFIX *>(map_ptr)->save(map_dir); }
MMAP_VALSTORE_SAVE(float32)
MMAP_VALSTORE_SAVE(str)
MMAP_VALSTORE_SAVE(bytes)

// Load
#define MMAP_VALSTORE_LOAD(SUFFIX) \
Expand All @@ -602,7 +602,7 @@ extern "C" {
map_ptr->load(map_dir, lazy_load); \
return static_cast<void *>(map_ptr); }
MMAP_VALSTORE_LOAD(float32)
MMAP_VALSTORE_LOAD(str)
MMAP_VALSTORE_LOAD(bytes)

// Create view from external values pointer
void mmap_valstore_from_vals_float32 (
Expand All @@ -614,41 +614,41 @@ extern "C" {
static_cast<mmap_valstore_float32 *>(map_ptr)->from_vals(n_row, n_col, vals);
}
// Allocate and Init
void mmap_valstore_from_vals_str (
void mmap_valstore_from_vals_bytes (
void* map_ptr,
const row_type n_row,
const col_type n_col,
const char* const* vals,
const mmap_valstore_str::str_len_type* vals_lens
const mmap_valstore_bytes::bytes_len_type* vals_lens
) {
static_cast<mmap_valstore_str *>(map_ptr)->from_vals(n_row, n_col, vals, vals_lens);
static_cast<mmap_valstore_bytes *>(map_ptr)->from_vals(n_row, n_col, vals, vals_lens);
}

// Get sub-matrix
void mmap_valstore_get_submatrix_float32 (
void mmap_valstore_batch_get_float32 (
void* map_ptr,
const uint32_t n_sub_row,
const uint32_t n_sub_col,
const uint64_t n_sub_row,
const uint64_t n_sub_col,
const row_type* sub_rows,
const col_type* sub_cols,
mmap_valstore_float32::value_type* ret,
const int threads
) {
static_cast<mmap_valstore_float32 *>(map_ptr)->get_submatrix(
static_cast<mmap_valstore_float32 *>(map_ptr)->batch_get(
n_sub_row, n_sub_col, sub_rows, sub_cols, ret, threads);
}
void mmap_valstore_get_submatrix_str (
void mmap_valstore_batch_get_bytes (
void* map_ptr,
const uint32_t n_sub_row,
const uint32_t n_sub_col,
const uint64_t n_sub_row,
const uint64_t n_sub_col,
const row_type* sub_rows,
const col_type* sub_cols,
const mmap_valstore_str::str_len_type trunc_val_len,
const mmap_valstore_bytes::bytes_len_type trunc_val_len,
char* ret,
mmap_valstore_str::str_len_type* ret_lens,
mmap_valstore_bytes::bytes_len_type* ret_lens,
const int threads
) {
static_cast<mmap_valstore_str *>(map_ptr)->get_submatrix(
static_cast<mmap_valstore_bytes *>(map_ptr)->batch_get(
n_sub_row, n_sub_col, sub_rows, sub_cols, trunc_val_len, ret, ret_lens, threads);
}
}
32 changes: 16 additions & 16 deletions pecos/core/utils/mmap_valstore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace pecos {
namespace mmap_valstore {

typedef uint64_t row_type;
typedef uint32_t col_type;
typedef uint64_t col_type;


class Float32Store {
Expand Down Expand Up @@ -53,10 +53,10 @@ class Float32Store {
vals_ = vals;
}

void get_submatrix(const uint32_t n_sub_row, const uint32_t n_sub_col, const row_type* sub_rows, const col_type* sub_cols, value_type* ret, const int threads=1) {
void batch_get(const uint64_t n_sub_row, const uint64_t n_sub_col, const row_type* sub_rows, const col_type* sub_cols, value_type* ret, const int threads=1) {
#pragma omp parallel for schedule(static, 1) num_threads(threads)
for (uint32_t i=0; i<n_sub_row; ++i) {
for (uint32_t j=0; j<n_sub_col; ++j) {
for (uint64_t i=0; i<n_sub_row; ++i) {
for (uint64_t j=0; j<n_sub_col; ++j) {
ret[i * n_sub_col + j] = vals_[sub_rows[i] * n_col_ + sub_cols[j]];
}
}
Expand Down Expand Up @@ -95,11 +95,11 @@ class Float32Store {
};


class StringStore {
class BytesStore {
public:
typedef uint32_t str_len_type;
typedef uint32_t bytes_len_type;

StringStore():
BytesStore():
n_row_(0),
n_col_(0)
{}
Expand All @@ -113,7 +113,7 @@ class StringStore {
}

// In memory. Allocate and assign values
void from_vals(const row_type n_row, const col_type n_col, const char* const* vals, const str_len_type* vals_lens) {
void from_vals(const row_type n_row, const col_type n_col, const char* const* vals, const bytes_len_type* vals_lens) {
n_row_ = n_row;
n_col_ = n_col;

Expand All @@ -137,15 +137,15 @@ class StringStore {
}
}

void get_submatrix(const uint32_t n_sub_row, const uint32_t n_sub_col, const row_type* sub_rows, const col_type* sub_cols,
const str_len_type trunc_val_len, char* ret, str_len_type* ret_lens, const int threads=1) {
void batch_get(const uint64_t n_sub_row, const uint64_t n_sub_col, const row_type* sub_rows, const col_type* sub_cols,
const bytes_len_type trunc_val_len, char* ret, bytes_len_type* ret_lens, const int threads=1) {
#pragma omp parallel for schedule(static, 1) num_threads(threads)
for (uint32_t i=0; i<n_sub_row; ++i) {
for (uint32_t j=0; j<n_sub_col; ++j) {
uint32_t sub_idx = i * n_sub_col + j;
for (uint64_t i=0; i<n_sub_row; ++i) {
for (uint64_t j=0; j<n_sub_col; ++j) {
uint64_t sub_idx = i * n_sub_col + j;
row_type idx = sub_rows[i] * n_col_ + sub_cols[j];
uint32_t ret_start_idx = sub_idx * trunc_val_len;
str_len_type cur_ret_len = std::min(trunc_val_len, vals_lens_[idx]);
uint64_t ret_start_idx = sub_idx * trunc_val_len;
bytes_len_type cur_ret_len = std::min(trunc_val_len, vals_lens_[idx]);
ret_lens[sub_idx] = cur_ret_len;
std::memcpy(ret + ret_start_idx, vals_.data() + vals_starts_[idx], cur_ret_len);
}
Expand Down Expand Up @@ -182,7 +182,7 @@ class StringStore {
row_type n_row_;
col_type n_col_;
mmap_util::MmapableVector<char> vals_; // Concatenated big string
mmap_util::MmapableVector<str_len_type> vals_lens_; // Length for each string
mmap_util::MmapableVector<bytes_len_type> vals_lens_; // Length for each string
mmap_util::MmapableVector<row_type> vals_starts_; // Start for each string in the concatenated big string

pecos::mmap_util::MmapStore mmap_store_;
Expand Down
Loading

0 comments on commit d864e60

Please sign in to comment.