Skip to content

Commit

Permalink
Cache table AM in Chunk struct
Browse files Browse the repository at this point in the history
A chunk can use different table access methods so to support this more
easily, cache the AM oid in the chunk struct. This allows identifying
the access method quickly at, e.g., planning time.
  • Loading branch information
erimatnor committed Sep 20, 2024
1 parent 616080e commit 6242428
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 48 deletions.
8 changes: 8 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,14 @@ chunk_tuple_found(TupleInfo *ti, void *arg)

chunk->relkind = get_rel_relkind(chunk->table_id);

if (!chunk->fd.osm_chunk)
{
chunk->amoid = ts_get_rel_am(chunk->table_id);
Assert(OidIsValid(chunk->amoid));
}
else
chunk->amoid = InvalidOid;

Ensure(chunk->relkind > 0,
"relkind for chunk \"%s\".\"%s\" is invalid",
NameStr(chunk->fd.schema_name),
Expand Down
1 change: 1 addition & 0 deletions src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef struct Chunk
char relkind;
Oid table_id;
Oid hypertable_relid;
Oid amoid; /* Table access method used by chunk */

/*
* The hypercube defines the chunks position in the N-dimensional space.
Expand Down
8 changes: 8 additions & 0 deletions src/chunk_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned
chunk->hypertable_relid = hs->main_table_relid;
chunk->table_id = chunk_reloid;

if (!chunk->fd.osm_chunk)
{
chunk->amoid = ts_get_rel_am(chunk->table_id);
Assert(OidIsValid(chunk->amoid));
}
else
chunk->amoid = InvalidOid;

locked_chunks[locked_chunk_count] = chunk;
locked_chunk_count++;

Expand Down
9 changes: 3 additions & 6 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,12 +1419,9 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo
(type == TS_REL_CHUNK_CHILD) && IS_UPDL_CMD(query);
if (use_transparent_decompression && (is_standalone_chunk || is_child_chunk_in_update))
{
TimescaleDBPrivate *fdw_private = (TimescaleDBPrivate *) rel->fdw_private;
Assert(fdw_private->cached_chunk_struct == NULL);
fdw_private->cached_chunk_struct =
ts_chunk_get_by_relid(rte->relid, /* fail_if_not_found = */ true);
if (!ts_chunk_is_partial(fdw_private->cached_chunk_struct) &&
ts_chunk_is_compressed(fdw_private->cached_chunk_struct))
const Chunk *chunk = ts_planner_chunk_fetch(root, rel);

if (!ts_chunk_is_partial(chunk) && ts_chunk_is_compressed(chunk))
{
rel->indexlist = NIL;
}
Expand Down
41 changes: 41 additions & 0 deletions src/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <nodes/pathnodes.h>
#include <nodes/pg_list.h>

#include "chunk.h"
#include "export.h"
#include "guc.h"
#include "hypertable.h"
Expand Down Expand Up @@ -108,3 +109,43 @@ extern TSDLLEXPORT void ts_add_baserel_cache_entry_for_chunk(Oid chunk_reloid,
Hypertable *hypertable);
TsRelType TSDLLEXPORT ts_classify_relation(const PlannerInfo *root, const RelOptInfo *rel,
Hypertable **ht);

/*
* Chunk-equivalent of planner_rt_fetch(), but returns the corresponding chunk
* instead of range table entry.
*
* Returns NULL if this rel is not a chunk.
*
* This cache should be pre-warmed by hypertable expansion, but it
* doesn't run in the following cases:
*
* 1. if it was a direct query on the chunk;
*
* 2. if it is not a SELECT QUERY.
*/
static inline const Chunk *
ts_planner_chunk_fetch(PlannerInfo *root, RelOptInfo *rel)
{
TimescaleDBPrivate *rel_private;

/* The rel can only be a chunk if it is part of a hypertable expansion
* (RELOPT_OTHER_MEMBER_REL) or a directy query on the chunk
* (RELOPT_BASEREL) */
if (rel->reloptkind != RELOPT_OTHER_MEMBER_REL && rel->reloptkind != RELOPT_BASEREL)
return NULL;

/* The rel_private entry should have been created as part of classifying
* the relation in timescaledb_get_relation_info_hook(). Therefore,
* ts_get_private_reloptinfo() asserts that it is already set but falls
* back to creating rel_private in release builds for safety. */
rel_private = ts_get_private_reloptinfo(rel);

if (NULL == rel_private->cached_chunk_struct)
{
RangeTblEntry *rte = planner_rt_fetch(rel->relid, root);
rel_private->cached_chunk_struct =
ts_chunk_get_by_relid(rte->relid, /* fail_if_not_found = */ true);
}

return rel_private->cached_chunk_struct;
}
49 changes: 49 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1748,3 +1748,52 @@ ts_update_placeholder(PG_FUNCTION_ARGS)
elog(ERROR, "this stub function is used only as placeholder during extension updates");
PG_RETURN_NULL();
}

/*
* Get the table access method Oid for a relation.
*/
Oid
ts_get_rel_am(Oid relid)
{
HeapTuple tuple;
Form_pg_class cform;
Oid amoid;

tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));

if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relid);

cform = (Form_pg_class) GETSTRUCT(tuple);
amoid = cform->relam;
ReleaseSysCache(tuple);

return amoid;
}

static Oid hypercore_amoid = InvalidOid;

bool
ts_is_hypercore_am(Oid amoid)
{
/* Can't use InvalidOid as an indication of non-cached value since
get_am_oid() will return InvalidOid when the access method does not
exist and we will do the lookup every time this query is called. This
boolean can be removed once we know that there should exist an access
method with the given name. */
static bool iscached = false;

if (!iscached && !OidIsValid(hypercore_amoid))
{
hypercore_amoid = get_am_oid("hypercore", true);
iscached = true;
}

if (!OidIsValid(hypercore_amoid))
return false;

/* Shouldn't get here for now */
Assert(false);

return amoid == hypercore_amoid;
}
3 changes: 3 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,6 @@ ts_datum_set_objectid(const AttrNumber attno, NullableDatum *datums, const Oid v
else
datums[AttrNumberGetAttrOffset(attno)].isnull = true;
}

extern TSDLLEXPORT Oid ts_get_rel_am(Oid relid);
extern TSDLLEXPORT bool ts_is_hypercore_am(Oid amoid);
24 changes: 13 additions & 11 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ static DecompressChunkPath *decompress_chunk_path_create(PlannerInfo *root, Comp
int parallel_workers,
Path *compressed_path);

static void decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, Chunk *chunk,
RelOptInfo *chunk_rel, bool needs_sequence_num);
static void decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info,
const Chunk *chunk, RelOptInfo *chunk_rel,
bool needs_sequence_num);

static SortInfo build_sortinfo(Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info,
static SortInfo build_sortinfo(const Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info,
List *pathkeys);

static bool
Expand Down Expand Up @@ -254,7 +255,8 @@ copy_decompress_chunk_path(DecompressChunkPath *src)
}

static CompressionInfo *
build_compressioninfo(PlannerInfo *root, Hypertable *ht, Chunk *chunk, RelOptInfo *chunk_rel)
build_compressioninfo(PlannerInfo *root, const Hypertable *ht, const Chunk *chunk,
RelOptInfo *chunk_rel)
{
AppendRelInfo *appinfo;
CompressionInfo *info = palloc0(sizeof(CompressionInfo));
Expand Down Expand Up @@ -510,7 +512,7 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
* compatible and the optimization can be used.
*/
static MergeBatchResult
can_batch_sorted_merge(PlannerInfo *root, CompressionInfo *info, Chunk *chunk)
can_batch_sorted_merge(PlannerInfo *root, CompressionInfo *info, const Chunk *chunk)
{
PathKey *pk;
Var *var;
Expand Down Expand Up @@ -620,8 +622,8 @@ can_batch_sorted_merge(PlannerInfo *root, CompressionInfo *info, Chunk *chunk)
* To save planning time, we therefore refrain from adding them.
*/
static void
add_chunk_sorted_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht, Index ht_relid,
Path *path, Path *compressed_path)
add_chunk_sorted_paths(PlannerInfo *root, RelOptInfo *chunk_rel, const Hypertable *ht,
Index ht_relid, Path *path, Path *compressed_path)
{
if (root->query_pathkeys == NIL)
return;
Expand Down Expand Up @@ -681,8 +683,8 @@ add_chunk_sorted_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht,
#define IS_UPDL_CMD(parse) \
((parse)->commandType == CMD_UPDATE || (parse)->commandType == CMD_DELETE)
void
ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht,
Chunk *chunk)
ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, const Hypertable *ht,
const Chunk *chunk)
{
RelOptInfo *compressed_rel;
ListCell *lc;
Expand Down Expand Up @@ -1650,7 +1652,7 @@ compressed_rel_setup_equivalence_classes(PlannerInfo *root, CompressionInfo *inf
* and add it to PlannerInfo
*/
static void
decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, Chunk *chunk,
decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, const Chunk *chunk,
RelOptInfo *chunk_rel, bool needs_sequence_num)
{
Index compressed_index = root->simple_rel_array_size;
Expand Down Expand Up @@ -2012,7 +2014,7 @@ find_const_segmentby(RelOptInfo *chunk_rel, CompressionInfo *info)
* If query pathkeys is shorter than segmentby + compress_orderby pushdown can still be done
*/
static SortInfo
build_sortinfo(Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info, List *pathkeys)
build_sortinfo(const Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info, List *pathkeys)
{
int pk_index;
PathKey *pk;
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ typedef struct DecompressChunkPath
bool batch_sorted_merge;
} DecompressChunkPath;

void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,
Chunk *chunk);
void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, const Hypertable *ht,
const Chunk *chunk);

extern bool ts_is_decompress_chunk_path(Path *path);

Expand Down
77 changes: 48 additions & 29 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <commands/extension.h>
#include <foreign/fdwapi.h>
#include <nodes/nodeFuncs.h>
#include <nodes/parsenodes.h>
#include <optimizer/paths.h>
#include <parser/parsetree.h>

Expand Down Expand Up @@ -97,41 +98,59 @@ tsl_create_upper_paths_hook(PlannerInfo *root, UpperRelationKind stage, RelOptIn
}
}

/*
* Check if a chunk should be decompressed via a DecompressChunk plan.
*
* Check first that it is a compressed chunk. Then, decompress unless it is
* SELECT * FROM ONLY <chunk>. We check if it is the ONLY case by calling
* ts_rte_is_marked_for_expansion. Respecting ONLY here is important to not
* break postgres tools like pg_dump.
*/
static inline bool
should_decompress_chunk(const RelOptInfo *rel, const RangeTblEntry *rte, const Chunk *chunk)
{
return ts_guc_enable_transparent_decompression &&
/* Check that the chunk is actually compressed */
chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID &&
/* Check that it is _not_ SELECT FROM ONLY <chunk> */
(rel->reloptkind != RELOPT_BASEREL || ts_rte_is_marked_for_expansion(rte));
}

void
tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte,
Hypertable *ht)
{
/* We can get here via query on hypertable in that case reloptkind
* will be RELOPT_OTHER_MEMBER_REL or via direct query on chunk
* in that case reloptkind will be RELOPT_BASEREL.
* If we get here via SELECT * FROM <chunk>, we decompress the chunk,
* unless the query was SELECT * FROM ONLY <chunk>.
* We check if it is the ONLY case by calling ts_rte_is_marked_for_expansion.
* Respecting ONLY here is important to not break postgres tools like pg_dump.
/* Only interested in queries on relations that are part of hypertables
* with compression enabled, so quick exit if not this case. */
if (ht == NULL || !TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
return;

/*
* For a chunk, we can get here via a query on the hypertable that expands
* to the chunk or by direct query on the chunk. In the former case,
* reloptkind will be RELOPT_OTHER_MEMBER_REL (nember of hypertable) or in
* the latter case reloptkind will be RELOPT_BASEREL (standalone rel).
*
* These two cases are checked in ts_planner_chunk_fetch().
*/
TimescaleDBPrivate *fdw_private = (TimescaleDBPrivate *) rel->fdw_private;
if (ts_guc_enable_transparent_decompression && ht &&
(rel->reloptkind == RELOPT_OTHER_MEMBER_REL ||
(rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(rte))) &&
TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
{
if (fdw_private->cached_chunk_struct == NULL)
{
/*
* We can not have the cached Chunk struct,
* 1) if it was a direct query on the chunk;
* 2) if it is not a SELECT QUERY.
* Caching is done by our hypertable expansion, which doesn't run in
* these cases.
*/
fdw_private->cached_chunk_struct =
ts_chunk_get_by_relid(rte->relid, /* fail_if_not_found = */ true);
}
const Chunk *chunk = ts_planner_chunk_fetch(root, rel);

if (fdw_private->cached_chunk_struct->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ts_decompress_chunk_generate_paths(root, rel, ht, fdw_private->cached_chunk_struct);
}
if (chunk == NULL)
return;

if (should_decompress_chunk(rel, rte, chunk))
{
ts_decompress_chunk_generate_paths(root, rel, ht, chunk);
}
/*
* Depending on access method used by the chunk, we might want to add
* alternative paths. This should not be compatible with transparent
* decompression, so only add if we didn't add decompression paths above.
*/
else if (ts_is_hypercore_am(chunk->amoid))
{
/* To be implemented */
Assert(false);
}
}

Expand Down

0 comments on commit 6242428

Please sign in to comment.