Skip to content

Commit

Permalink
Core changes to support running Splinter with allocated shared memory.
Browse files Browse the repository at this point in the history
Support to run SplinterDB with shared memory configured for most
memory allocation is an -EXPERIMENTAL- feature added with this commit.

This commit brings in basic support to create a shared memory segment and
to redirect all memory allocation primitives to shared memory. Currently,
we only support a simplistic memory mgmt; i.e. only-allocs, and a very
simplistic handling of free() of the very last memory piece allocated.
With shared segments of 1-2 GiB we can run all functional and unit tests.

The high-points of the changes are:

- External configuration: splinterdb_config{} gains a few new visible
  fields to configure and troubleshoot shared memory configuration.
   - Boolean: use_shmem: Default is OFF
   - size_t : shmem_size:

- The main driving change is the re-deployment of platform_heap_id 'hid'
  arg that appears in all memory-related interfaces. If Splinter is
  configured for shared memory use, 'hid' will be an opaque handle to
  the shared segment. Most memory allocation will be redirected to new
  shmem-based alloc() / free() interfaces.

- Formalize usages of PROCESS_PRIVATE_HEAP_ID: A small number of clients
  that wish to repeatedly allocate large chunks of memory tend to cause
  OOMs. The memory allocated by these clients is not shared across threads
  / processes. For such usages, introduce PROCESS_PRIVATE_HEAP_ID as an
  alias to NULL, defaulting to allocating memory from the heap.

- Manage handling of heap-ID to platform_get_heap_id() to correctly
  return the handle to shared memory. (Otherwise, it would return
  NULL by default.)

- BTree pack allocates large fingerprint-array. This also causes large
  tests to run into OOMs. For threaded execution, it's ok if the memory
  for this array is allocated from the heap. But for multi-process
  execution, when one process (thread) allocates this finger print
  array, another thread may pick up the task to compact a bundle and
  will try to free this memory.

  So, this memory has to come from shared memory. To cope with such
  repeated allocations of large chunks of memory to build fingerprint,
  a small scheme for recycling such "free"-large-memory chunks scheme
  is supported by shmem module.

  Applied this technique to recycle memory allocated for iterators also.
  They tend to be big'gish, so can also cause shmem-OOMs.

- All existing functional and unit-tests have been enhanced to now
  support "--use-shmem" argument. This will create Splinter with
  shared memory configured, and tests are run in this mode.

  This change brings-in quite a good coverage of existing testing for
  this new feature.

   - New test: large_inserts_bugs_stress_test -- added to cover the
     primary use-case of concurrent insert performance benchmarking
     (that this feature is driving in prior integration effort).

   - test.sh enhanced to run different classes of test with the
     "--use-shmem" option.

- Diagnostis & Troubleshooting:

   - Shmem-based alloc/free interfaces extended to print name of object
     and other call-site info, to better pinpoint source code-flow
     leading to memory issues.

   - Add shared memory usage metrics, including for large-fragment
     handling.  Report summary-line of metrics when Splinter is shutdown.
     Print stats on close.

   - Add various utility diagnostic helper methods to validate that
     addresses within shared memory are valid. Unit-tests and some asserts
     use these.

- minor #include cleanups

Changes arising through review cycle and stabilization v/s /main:

- In test.sh/run_slower_unit_tests(), re-enable execution of
  large_inserts_bugs_stress_test, but bracketted under "set +e" / "set -e"
  settings. If this test fails in CI (as it does randomly), hopefully,
  this SET toggling will allow the rest of the script to still run. CI job
  should not fail immediately.
  (Some deeper stabilization is needed for these test cases.)

- Purged the heap_handle * in shmem.h/.c module and through the rest
  of the Splinter code. Only heap-ID is a valid handle anymore.

- Fix race condition bug in platform_shm_alloc()

- Added Micro-optimization to recycle last-allocated frag being freed.

- Add config_parse_use_shmem() as parsing interface to see if
  "--use-shmem" was supplied. Apply to many unit-/functional-tests.

Rework unit-tests to use config_parse_use_shmem() to support --use-shmem parsing.

Re-enable large_inserts_bugs_stress_test execution.
  • Loading branch information
gapisback committed Aug 30, 2023
1 parent 2fb4d7c commit 42799b1
Show file tree
Hide file tree
Showing 51 changed files with 3,982 additions and 381 deletions.
34 changes: 31 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TESTSRC := $(COMMON_TESTSRC) $(FUNCTIONAL_TESTSRC) $(UNIT_TESTSRC)
# Construct a list of fast unit-tests that will be linked into unit_test binary,
# eliminating a sequence of slow-running unit-test programs.
ALL_UNIT_TESTSRC := $(call rwildcard, $(UNIT_TESTSDIR), *.c)
SLOW_UNIT_TESTSRC = splinter_test.c config_parse_test.c
SLOW_UNIT_TESTSRC = splinter_test.c config_parse_test.c large_inserts_bugs_stress_test.c
SLOW_UNIT_TESTSRC_FILTER := $(foreach slowf,$(SLOW_UNIT_TESTSRC), $(UNIT_TESTSDIR)/$(slowf))
FAST_UNIT_TESTSRC := $(sort $(filter-out $(SLOW_UNIT_TESTSRC_FILTER), $(ALL_UNIT_TESTSRC)))

Expand Down Expand Up @@ -386,7 +386,8 @@ $(foreach unit,$(UNIT_TESTBINS),$(eval $(call unit_test_self_dependency,$(unit))
#
# These will need to be fleshed out for filters, io subsystem, trunk,
# etc. as we create mini unit test executables for those subsystems.
PLATFORM_SYS = $(OBJDIR)/$(SRCDIR)/$(PLATFORM_DIR)/platform.o
PLATFORM_SYS = $(OBJDIR)/$(SRCDIR)/$(PLATFORM_DIR)/platform.o \
$(OBJDIR)/$(SRCDIR)/$(PLATFORM_DIR)/shmem.o

PLATFORM_IO_SYS = $(OBJDIR)/$(SRCDIR)/$(PLATFORM_DIR)/laio.o

Expand Down Expand Up @@ -443,7 +444,10 @@ $(BINDIR)/$(UNITDIR)/splinterdb_stress_test: $(COMMON_TESTOBJ)
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/writable_buffer_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ)
$(COMMON_TESTOBJ) \
$(COMMON_UNIT_TESTOBJ) \
$(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/limitations_test: $(COMMON_TESTOBJ) \
$(COMMON_UNIT_TESTOBJ) \
Expand All @@ -466,6 +470,25 @@ $(BINDIR)/$(UNITDIR)/platform_apis_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(PLATFORM_SYS)

$(BINDIR)/$(UNITDIR)/splinter_shmem_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_ipc_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ)

$(BINDIR)/$(UNITDIR)/large_inserts_bugs_stress_test: $(UTIL_SYS) \
$(OBJDIR)/$(TESTS_DIR)/config.o \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinterdb_heap_id_mgmt_test: $(COMMON_TESTOBJ) \
$(COMMON_UNIT_TESTOBJ) \
$(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \
$(LIBDIR)/libsplinterdb.so



########################################
# Convenience mini unit-test targets
unit/util_test: $(BINDIR)/$(UNITDIR)/util_test
Expand All @@ -476,6 +499,11 @@ unit/splinter_test: $(BINDIR)/$(UNITDIR)/splinter_test
unit/splinterdb_quick_test: $(BINDIR)/$(UNITDIR)/splinterdb_quick_test
unit/splinterdb_stress_test: $(BINDIR)/$(UNITDIR)/splinterdb_stress_test
unit/writable_buffer_test: $(BINDIR)/$(UNITDIR)/writable_buffer_test
unit/config_parse_test: $(BINDIR)/$(UNITDIR)/config_parse_test
unit/limitations_test: $(BINDIR)/$(UNITDIR)/limitations_test
unit/task_system_test: $(BINDIR)/$(UNITDIR)/task_system_test
unit/splinter_shmem_test: $(BINDIR)/$(UNITDIR)/splinter_shmem_test
unit/splinter_ipc_test: $(BINDIR)/$(UNITDIR)/splinter_ipc_test
unit_test: $(BINDIR)/unit_test

# -----------------------------------------------------------------------------
Expand Down
31 changes: 27 additions & 4 deletions include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,29 @@
const char *
splinterdb_get_version();

// Configuration options for SplinterDB
typedef struct {
/*
* ****************************************************************************
* Configuration options for SplinterDB:
*
* Physical configuration things such as file name, cache & disk-size,
* extent- and page-size are specified here. Application-specific data
* configuration is also provided through this struct. Additionally,
* user can select whether to use malloc()/free()-based memory allocation
* for all structures (default), or choose to setup a shared segment
* which will be used for shared structures.
*
* ******************* EXPERIMENTAL FEATURES ********************
*
* - use_shmem: Support for shared memory segments:
* This flag will configure a shared memory segment. All (most) run-time
* memory allocation will be done from this shared segment. Currently,
* we do not support free(), so you will likely run out of shared memory
* and run into shared-memory OOM errors. This functionality is
* solely meant for internal development uses.
*
* ******************* EXPERIMENTAL FEATURES ********************
*/
typedef struct splinterdb_config {
// required configuration
const char *filename;
uint64 cache_size;
Expand All @@ -32,12 +53,15 @@ typedef struct {
// For a simple reference implementation, see default_data_config.h
data_config *data_cfg;


// optional advanced config below
// if unset, defaults will be used
void *heap_handle;
void *heap_id;

// Shared memory support
uint64 shmem_size;
_Bool use_shmem; // Default is FALSE.

uint64 page_size;
uint64 extent_size;

Expand Down Expand Up @@ -121,7 +145,6 @@ typedef struct {
// work to be performed on foreground threads, increasing tail
// latencies.
uint64 queue_scale_percent;

} splinterdb_config;

// Opaque handle to an opened instance of SplinterDB
Expand Down
14 changes: 12 additions & 2 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ btree_iterator_init(cache *cc,
void
btree_iterator_deinit(btree_iterator *itor);

static inline void
static inline platform_status
btree_pack_req_init(btree_pack_req *req,
cache *cc,
btree_config *cfg,
Expand All @@ -344,8 +344,18 @@ btree_pack_req_init(btree_pack_req *req,
req->seed = seed;
if (hash != NULL && max_tuples > 0) {
req->fingerprint_arr =
TYPED_ARRAY_MALLOC(hid, req->fingerprint_arr, max_tuples);
TYPED_ARRAY_ZALLOC(hid, req->fingerprint_arr, max_tuples);

// When we run with shared-memory configured, we expect that it is sized
// big-enough to not get OOMs from here. Hence, only a debug_assert().
debug_assert(req->fingerprint_arr,
"Unable to allocate memory for %lu tuples",
max_tuples);
if (!req->fingerprint_arr) {
return STATUS_NO_MEMORY;
}
}
return STATUS_OK;
}

static inline void
Expand Down
21 changes: 14 additions & 7 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,11 @@ clockcache_get_entry(clockcache *cc, uint32 entry_number)
return (&cc->entry[entry_number]);
}

static inline entry_status
clockcache_get_status(clockcache *cc, uint32 entry_number)
{
return clockcache_get_entry(cc, entry_number)->status;
}
static inline entry_status
clockcache_set_flag(clockcache *cc, uint32 entry_number, entry_status flag)
{
Expand All @@ -634,7 +639,7 @@ clockcache_clear_flag(clockcache *cc, uint32 entry_number, entry_status flag)
static inline uint32
clockcache_test_flag(clockcache *cc, uint32 entry_number, entry_status flag)
{
return flag & clockcache_get_entry(cc, entry_number)->status;
return flag & clockcache_get_status(cc, entry_number);
}

#ifdef RECORD_ACQUISITION_STACKS
Expand Down Expand Up @@ -912,15 +917,17 @@ clockcache_assert_no_locks_held(clockcache *cc)
}
}

void
bool32
clockcache_assert_clean(clockcache *cc)
{
uint64 i;

for (i = 0; i < cc->cfg->page_capacity; i++) {
debug_assert(clockcache_test_flag(cc, i, CC_FREE)
for (i = 0; (i < cc->cfg->page_capacity)
&& (clockcache_test_flag(cc, i, CC_FREE)
|| clockcache_test_flag(cc, i, CC_CLEAN));
i++)
{ /* Do nothing */
}
return (i == cc->cfg->page_capacity);
}

/*
Expand Down Expand Up @@ -1197,7 +1204,7 @@ clockcache_ok_to_writeback(clockcache *cc,
uint32 entry_number,
bool32 with_access)
{
uint32 status = clockcache_get_entry(cc, entry_number)->status;
uint32 status = clockcache_get_status(cc, entry_number);
return ((status == CC_CLEANABLE1_STATUS)
|| (with_access && status == CC_CLEANABLE2_STATUS));
}
Expand Down Expand Up @@ -1692,7 +1699,7 @@ clockcache_flush(clockcache *cc)
// make sure all aio is complete again
io_cleanup_all(cc->io);

clockcache_assert_clean(cc);
debug_assert(clockcache_assert_clean(cc));
}

/*
Expand Down
6 changes: 6 additions & 0 deletions src/data_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ key_buffer_init_from_key(key_buffer *kb, platform_heap_id hid, key src)
return key_buffer_copy_key(kb, src);
}

static inline uint64
key_buffer_length(key_buffer *kb)
{
return writable_buffer_length(&kb->wb);
}

/* Converts kb to a user key if it isn't already. */
static inline platform_status
key_buffer_resize(key_buffer *kb, uint64 length)
Expand Down
4 changes: 4 additions & 0 deletions src/default_data_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ message_to_string(const data_config *cfg,
}


/*
* Function to initialize application-specific data_config{} struct
* with default values.
*/
void
default_data_config_init(const size_t max_key_size, // IN
data_config *out_cfg // OUT
Expand Down
5 changes: 1 addition & 4 deletions src/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ struct io_handle {
};

platform_status
io_handle_init(platform_io_handle *ioh,
io_config *cfg,
platform_heap_handle hh,
platform_heap_id hid);
io_handle_init(platform_io_handle *ioh, io_config *cfg, platform_heap_id hid);

void
io_handle_deinit(platform_io_handle *ioh);
Expand Down
4 changes: 2 additions & 2 deletions src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ merge_iterator_create(platform_heap_id hid,
== ARRAY_SIZE(merge_itor->ordered_iterators),
"size mismatch");

merge_itor = TYPED_ZALLOC(hid, merge_itor);
merge_itor = TYPED_ZALLOC(PROCESS_PRIVATE_HEAP_ID, merge_itor);
if (merge_itor == NULL) {
return STATUS_NO_MEMORY;
}
Expand Down Expand Up @@ -598,7 +598,7 @@ platform_status
merge_iterator_destroy(platform_heap_id hid, merge_iterator **merge_itor)
{
merge_accumulator_deinit(&(*merge_itor)->merge_buffer);
platform_free(hid, *merge_itor);
platform_free(PROCESS_PRIVATE_HEAP_ID, *merge_itor);
*merge_itor = NULL;

return STATUS_OK;
Expand Down
5 changes: 1 addition & 4 deletions src/platform_linux/laio.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ static io_ops laio_ops = {
* structures and initialize the IO sub-system.
*/
platform_status
io_handle_init(laio_handle *io,
io_config *cfg,
platform_heap_handle hh,
platform_heap_id hid)
io_handle_init(laio_handle *io, io_config *cfg, platform_heap_id hid)
{
int status;
uint64 req_size;
Expand Down
48 changes: 37 additions & 11 deletions src/platform_linux/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

#include <stdarg.h>
#include <unistd.h>
#include "platform.h"

#include <sys/mman.h>
#include "platform.h"
#include "shmem.h"

__thread threadid xxxtid = INVALID_TID;

Expand All @@ -19,6 +19,14 @@ bool32 platform_use_mlock = FALSE;
platform_log_handle *Platform_default_log_handle = NULL;
platform_log_handle *Platform_error_log_handle = NULL;

/*
* Declare globals to track heap handle/ID that may have been created when
* using shared memory. We stash away these handles so that we can return the
* right handle via platform_get_heap_id() interface, in case shared segments
* are in use.
*/
platform_heap_id Heap_id = NULL;

// This function is run automatically at library-load time
void __attribute__((constructor)) platform_init_log_file_handles(void)
{
Expand Down Expand Up @@ -47,21 +55,39 @@ platform_get_stdout_stream(void)
return Platform_default_log_handle;
}

/*
* platform_heap_create() - Create a heap for memory allocation.
*
* By default, we just revert to process' heap-memory and use malloc() / free()
* for memory management. If Splinter is run with shared-memory configuration,
* create a shared-segment which acts as the 'heap' for memory allocation.
*/
platform_status
platform_heap_create(platform_module_id UNUSED_PARAM(module_id),
uint32 max,
platform_heap_handle *heap_handle,
platform_heap_id *heap_id)
{
*heap_handle = NULL;
*heap_id = NULL;
platform_heap_create(platform_module_id UNUSED_PARAM(module_id),
size_t max,
bool use_shmem,
platform_heap_id *heap_id)
{
if (use_shmem) {
platform_status rc = platform_shmcreate(max, heap_id);
if (SUCCESS(rc)) {
Heap_id = *heap_id;
}
return rc;
}
*heap_id = NULL;

return STATUS_OK;
}

void
platform_heap_destroy(platform_heap_handle UNUSED_PARAM(*heap_handle))
{}
platform_heap_destroy(platform_heap_id *heap_id)
{
// If shared segment was allocated, it's being tracked thru heap ID.
if (*heap_id) {
return platform_shmdestroy(heap_id);
}
}

/*
* platform_buffer_init() - Initialize an input buffer_handle, bh.
Expand Down
Loading

0 comments on commit 42799b1

Please sign in to comment.