Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL/blockmaps: continue blockmaps initialization after failure #211

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6d8eeab
WAL/blockmaps: continue blockmaps initialization after failure
medvied Oct 21, 2023
aefbd75
src/hnsw/external_index: use ReadBufferExtended(P_NEW) instead of Ext…
medvied Oct 21, 2023
445c099
Merge remote-tracking branch 'origin/main' into wal-blockmaps
medvied Oct 25, 2023
f272c85
src/hnsw/validate_index: update to work with changes to HnswIndexHead…
medvied Oct 25, 2023
99a9f2f
cmake/FindPostgreSQL: put the server include dir first
medvied Oct 25, 2023
c3d2178
Revert "cmake/FindPostgreSQL: put the server include dir first"
medvied Oct 25, 2023
7a40a68
Merge remote-tracking branch 'origin/main' into wal-blockmaps
medvied Oct 25, 2023
7430b4a
Merge remote-tracking branch 'origin/main' into wal-blockmaps
medvied Oct 27, 2023
821b41f
Implement failure points
medvied Oct 26, 2023
4f98054
Revert "Implement failure points"
medvied Nov 4, 2023
110d515
Merge remote-tracking branch 'origin/main' into wal-blockmaps
medvied Nov 4, 2023
d7ae614
test/hnsw_blockmap_create: implement
medvied Nov 5, 2023
5791168
test/hnsw_failure_point: no longer works, temporarily
medvied Nov 5, 2023
770076d
test/schedule: fix hnsw_blockmap_create name (was non-existing test_b…
medvied Nov 5, 2023
55ca770
run clang-format
medvied Nov 5, 2023
a007ba6
test/hnsw_failure_point: make it work by using another failure point
medvied Nov 5, 2023
1ad9d05
implement and use _lantern_internal.continue_blockmap_group_initializ…
medvied Nov 5, 2023
6c70e3c
fix error: variable ‘ptr’ set but not used
medvied Nov 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sql/lantern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ CREATE FUNCTION _lantern_internal.validate_index(index regclass, print_info bool
CREATE FUNCTION _lantern_internal.failure_point_enable(func TEXT, name TEXT, dont_trigger_first_nr INTEGER DEFAULT 0) RETURNS VOID
AS 'MODULE_PATHNAME', 'lantern_internal_failure_point_enable' LANGUAGE C STABLE STRICT PARALLEL UNSAFE;

CREATE FUNCTION _lantern_internal.continue_blockmap_group_initialization(index regclass) RETURNS VOID
AS 'MODULE_PATHNAME', 'lantern_internal_continue_blockmap_group_initialization' LANGUAGE C STABLE STRICT PARALLEL UNSAFE;

-- operator classes
CREATE OR REPLACE FUNCTION _lantern_internal._create_ldb_operator_classes(access_method_name TEXT) RETURNS BOOLEAN AS $$
DECLARE
Expand Down
9 changes: 9 additions & 0 deletions src/hnsw.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ Datum lantern_internal_failure_point_enable(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}

PGDLLEXPORT PG_FUNCTION_INFO_V1(lantern_internal_continue_blockmap_group_initialization);
Datum lantern_internal_continue_blockmap_group_initialization(PG_FUNCTION_ARGS)
{
Oid indrelid = PG_GETARG_OID(0);

ldb_continue_blockmap_group_initialization(indrelid);
PG_RETURN_VOID();
}

/*
* Get data type for give oid
* */
Expand Down
388 changes: 317 additions & 71 deletions src/hnsw/external_index.c

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions src/hnsw/external_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,17 @@
// to be able to test more of the algorithm corner cases with a small table dataset
#define HNSW_BLOCKMAP_BLOCKS_PER_PAGE 2000

// the header is updated every time this number of pages is initialized in ContinueBlockMapGroupInitialization()
#define HNSW_BLOCKMAP_UPDATE_HEADER_EVERY 100

#define USEARCH_HEADER_SIZE 80

typedef struct HnswBlockMapGroupDesc
{
BlockNumber first_block;
uint32 blockmaps_initialized;
} HnswBlockMapGroupDesc;

typedef struct HnswIndexHeaderPage
{
uint32 magicNumber;
Expand All @@ -47,8 +56,8 @@ typedef struct HnswIndexHeaderPage
BlockNumber last_data_block;
char usearch_header[ USEARCH_HEADER_SIZE ];

uint32 blockmap_page_groups;
uint32 blockmap_page_group_index[ HNSW_MAX_BLOCKMAP_GROUPS ];
uint32 blockmap_groups_nr;
HnswBlockMapGroupDesc blockmap_groups[ HNSW_MAX_BLOCKMAP_GROUPS ];
} HnswIndexHeaderPage;

typedef struct HnswIndexPageSpecialBlock
Expand Down Expand Up @@ -84,7 +93,7 @@ typedef struct
Relation index_rel;

// used for scans
uint32 blockmap_page_group_index_cache[ HNSW_MAX_BLOCKMAP_GROUPS ]; // todo::
HnswBlockMapGroupDesc blockmap_groups_cache[ HNSW_MAX_BLOCKMAP_GROUPS ]; // todo::
// used for inserts
HnswIndexHeaderPage *header_page_under_wal;

Expand Down Expand Up @@ -130,4 +139,12 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel,
uint32 new_tuple_level,
HnswInsertState *insertstate);

BlockNumber NumberOfBlockMapsInGroup(unsigned groupno);

/*
* Continue and finish the last blockmap group initialization if needed.
* @see ContinueBlockMapGroupInitialization
*/
void ldb_continue_blockmap_group_initialization(Oid indrelid);

#endif // LDB_HNSW_EXTERNAL_INDEX_H
6 changes: 5 additions & 1 deletion src/hnsw/failure_point.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ void ldb_failure_point_enable(const char *func, const char *name, uint32 dont_tr
state->remaining = dont_trigger_first_nr;
strncpy(state->func, func, lengthof(state->func));
strncpy(state->name, name, lengthof(state->name));
elog(INFO, "Failure point (func=%s name=%s) is enabled.", state->func, state->name);
elog(INFO,
"Failure point (func=%s name=%s remaining=%" PRIu32 ") is enabled.",
state->func,
state->name,
state->remaining);
}

bool ldb_failure_point_is_enabled(const char *func, const char *name)
Expand Down
5 changes: 2 additions & 3 deletions src/hnsw/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ IndexScanDesc ldb_ambeginscan(Relation index, int nkeys, int norderbys)
if(error != NULL) elog(ERROR, "error loading index: %s", error);
assert(error == NULL);

memcpy(retriever_ctx->blockmap_page_group_index_cache,
headerp->blockmap_page_group_index,
sizeof(retriever_ctx->block_numbers_cache));
memcpy(
retriever_ctx->blockmap_groups_cache, headerp->blockmap_groups, sizeof(retriever_ctx->blockmap_groups_cache));
retriever_ctx->header_page_under_wal = NULL;

usearch_mem = headerp->usearch_header;
Expand Down
44 changes: 34 additions & 10 deletions src/hnsw/validate_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,29 @@ static void ldb_vi_read_blockmaps(Relation index,
uint32 group_node_first_index = 0;
uint32 nodes_remaining = nodes_nr;
uint32 batch_size = HNSW_BLOCKMAP_BLOCKS_PER_PAGE;
bool last_group_node_is_used = true;

if(blocks_nr == 0) return;
vi_blocks[ 0 ].vp_type = LDB_VI_BLOCK_HEADER;
while(nodes_remaining != 0) {
if(blockmap_groupno > index_header->blockmap_page_groups) {
while(nodes_remaining != 0 || (last_group_node_is_used && blockmap_groupno < index_header->blockmap_groups_nr)) {
if(blockmap_groupno >= index_header->blockmap_groups_nr) {
elog(ERROR,
"blockmap_groupno=%d > index_header->blockmap_page_groups=%d",
"blockmap_groupno=%" PRIu32 " >= index_header->blockmap_groups_nr=%" PRIu32,
blockmap_groupno,
index_header->blockmap_page_groups);
index_header->blockmap_groups_nr);
}
if(index_header->blockmap_groups[ blockmap_groupno ].blockmaps_initialized
!= NumberOfBlockMapsInGroup(blockmap_groupno)) {
elog(ERROR,
"HnswBlockMapGroupDesc.blockmaps_initialized=%" PRIu32 " != NumberOfBlockMapsInGroup()=%" PRIu32
" for blockmap_groupno=%" PRIu32,
index_header->blockmap_groups[ blockmap_groupno ].blockmaps_initialized,
NumberOfBlockMapsInGroup(blockmap_groupno),
blockmap_groupno);
}
/* TODO see the loop in CreateBlockMapGroup() */
BlockNumber number_of_blockmaps_in_group = 1u << blockmap_groupno;
BlockNumber group_start = index_header->blockmap_page_group_index[ blockmap_groupno ];
BlockNumber group_start = index_header->blockmap_groups[ blockmap_groupno ].first_block;
for(unsigned blockmap_id = 0; blockmap_id < number_of_blockmaps_in_group; ++blockmap_id) {
BlockNumber blockmap_block = group_start + blockmap_id;
BlockNumber expected_special_nextblockno;
Expand Down Expand Up @@ -181,11 +191,13 @@ static void ldb_vi_read_blockmaps(Relation index,
elog(ERROR,
"blockmap->first_id=%" PRIu32
" != "
"group_node_first_index=%d + blockmap_id=%u * HNSW_BLOCKMAP_BLOCKS_PER_PAGE=%d",
"group_node_first_index=%d + blockmap_id=%u * HNSW_BLOCKMAP_BLOCKS_PER_PAGE=%d for "
"blockmap_groupno=%" PRIu32,
blockmap->first_id,
group_node_first_index,
blockmap_id,
HNSW_BLOCKMAP_BLOCKS_PER_PAGE);
HNSW_BLOCKMAP_BLOCKS_PER_PAGE,
blockmap_groupno);
}
HnswIndexPageSpecialBlock *special = (HnswIndexPageSpecialBlock *)PageGetSpecialPointer(page);
if(special->firstId != blockmap->first_id) {
Expand Down Expand Up @@ -232,6 +244,11 @@ static void ldb_vi_read_blockmaps(Relation index,

UnlockReleaseBuffer(buf);
}
/*
* This is for the case when the last blockmap group is initialized,
* but PostgreSQL process crashed before something was added to it.
*/
last_group_node_is_used = batch_size == nodes_remaining;
nodes_remaining -= Min(batch_size, nodes_remaining);
group_node_first_index += batch_size;
batch_size = batch_size * 2;
Expand Down Expand Up @@ -640,7 +657,7 @@ void ldb_validate_index(Oid indrelid, bool print_info)
elog(INFO,
"index_header = HnswIndexHeaderPage("
"version=%" PRIu32 " vector_dim=%" PRIu32 " m=%" PRIu32 " ef_construction=%" PRIu32 " ef=%" PRIu32
" metric_kind=%d num_vectors=%" PRIu32 " last_data_block=%" PRIu32 " blockmap_page_groups=%" PRIu32 ")",
" metric_kind=%d num_vectors=%" PRIu32 " last_data_block=%" PRIu32 " blockmap_groups_nr=%" PRIu32 ")",
index_header->version,
index_header->vector_dim,
index_header->m,
Expand All @@ -649,15 +666,22 @@ void ldb_validate_index(Oid indrelid, bool print_info)
index_header->metric_kind,
index_header->num_vectors,
index_header->last_data_block,
index_header->blockmap_page_groups);
index_header->blockmap_groups_nr);
for(uint32 i = 0; i < index_header->blockmap_groups_nr; ++i) {
elog(INFO,
"blockmap_groups[%" PRIu32 "]=(first_block=%" PRIu32 ", blockmaps_initialized=%" PRIu32 "),",
i,
index_header->blockmap_groups[ i ].first_block,
index_header->blockmap_groups[ i ].blockmaps_initialized);
}
}

blocks_nr = RelationGetNumberOfBlocksInFork(index, MAIN_FORKNUM);
nodes_nr = index_header->num_vectors;
if(print_info) {
elog(INFO, "blocks_nr=%" PRIu32 " nodes_nr=%" PRIu32, blocks_nr, nodes_nr);
}
/* TODO check nodes_nr against index_header->blockmap_page_groups */
/* TODO check nodes_nr against index_header->blockmap_groups_nr */

vi_blocks = palloc0_array(typeof(*vi_blocks), blocks_nr);
vi_nodes = palloc0_array(typeof(*vi_nodes), nodes_nr);
Expand Down
25 changes: 13 additions & 12 deletions test/expected/ext_relocation.out
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ FROM pg_catalog.pg_extension AS e
INNER JOIN pg_catalog.pg_namespace AS np ON (np.oid = p.pronamespace)
WHERE d.deptype = 'e' AND e.extname = 'lantern'
ORDER BY 1, 3;
extschema | proname | proschema
-----------+------------------------------+-------------------
schema1 | validate_index | _lantern_internal
schema1 | failure_point_enable | _lantern_internal
schema1 | _create_ldb_operator_classes | _lantern_internal
schema1 | l2sq_dist | schema1
schema1 | hnsw_handler | schema1
schema1 | hamming_dist | schema1
schema1 | cos_dist | schema1
schema1 | ldb_generic_dist | schema1
schema1 | ldb_generic_dist | schema1
(9 rows)
extschema | proname | proschema
-----------+----------------------------------------+-------------------
schema1 | validate_index | _lantern_internal
schema1 | failure_point_enable | _lantern_internal
schema1 | continue_blockmap_group_initialization | _lantern_internal
schema1 | _create_ldb_operator_classes | _lantern_internal
schema1 | cos_dist | schema1
schema1 | hnsw_handler | schema1
schema1 | hamming_dist | schema1
schema1 | ldb_generic_dist | schema1
schema1 | ldb_generic_dist | schema1
schema1 | l2sq_dist | schema1
(10 rows)

-- show all the extension operators
SELECT ne.nspname AS extschema, op.oprname, np.nspname AS proschema
Expand Down
Loading