Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to plan based on chunk table AM #7284

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1614,12 +1614,14 @@ chunk_tuple_found(TupleInfo *ti, void *arg)
* ts_chunk_build_from_tuple_and_stub() since chunk_resurrect() also uses
* that function and, in that case, the chunk object is needed to create
* the data table and related objects. */
chunk->table_id =
ts_get_relation_relid(NameStr(chunk->fd.schema_name), NameStr(chunk->fd.table_name), false);

chunk->hypertable_relid = ts_hypertable_id_to_relid(chunk->fd.hypertable_id, false);
ts_get_rel_info_by_name(NameStr(chunk->fd.schema_name),
NameStr(chunk->fd.table_name),
&chunk->table_id,
&chunk->amoid,
&chunk->relkind);

chunk->relkind = get_rel_relkind(chunk->table_id);
Assert(OidIsValid(chunk->amoid) || chunk->fd.osm_chunk);

Ensure(chunk->relkind > 0,
"relkind for chunk \"%s\".\"%s\" is invalid",
Expand Down
1 change: 1 addition & 0 deletions src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef struct Chunk
char relkind;
Oid table_id;
Oid hypertable_relid;
Oid amoid; /* Table access method used by chunk */

/*
* The hypercube defines the chunks position in the N-dimensional space.
Expand Down
5 changes: 4 additions & 1 deletion src/chunk_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned
for (int i = 0; i < locked_chunk_count; i++)
{
Chunk *chunk = locked_chunks[i];
chunk->relkind = get_rel_relkind(chunk->table_id);

ts_get_rel_info(chunk->table_id, &chunk->amoid, &chunk->relkind);

Assert(OidIsValid(chunk->amoid) || chunk->fd.osm_chunk);
}

/*
Expand Down
7 changes: 3 additions & 4 deletions src/import/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,15 @@ ts_set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *parent_rel, Index pare
TsRelType reltype = ts_classify_relation(root, child_rel, &ht);
if (reltype == TS_REL_CHUNK_CHILD && !TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
{
TimescaleDBPrivate *fdw_private = (TimescaleDBPrivate *) child_rel->fdw_private;
const Chunk *chunk = ts_planner_chunk_fetch(root, child_rel);

/*
* This function is called only in tandem with our own hypertable
* expansion, so the Chunk struct must be initialized already.
*/
Assert(fdw_private->cached_chunk_struct != NULL);
Assert(chunk != NULL);

if (!ts_chunk_is_partial(fdw_private->cached_chunk_struct) &&
ts_chunk_is_compressed(fdw_private->cached_chunk_struct))
if (!ts_chunk_is_partial(chunk) && ts_chunk_is_compressed(chunk))
{
child_rel->indexlist = NIL;
}
Expand Down
9 changes: 3 additions & 6 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,12 +1419,9 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo
(type == TS_REL_CHUNK_CHILD) && IS_UPDL_CMD(query);
if (use_transparent_decompression && (is_standalone_chunk || is_child_chunk_in_update))
{
TimescaleDBPrivate *fdw_private = (TimescaleDBPrivate *) rel->fdw_private;
Assert(fdw_private->cached_chunk_struct == NULL);
fdw_private->cached_chunk_struct =
ts_chunk_get_by_relid(rte->relid, /* fail_if_not_found = */ true);
if (!ts_chunk_is_partial(fdw_private->cached_chunk_struct) &&
ts_chunk_is_compressed(fdw_private->cached_chunk_struct))
const Chunk *chunk = ts_planner_chunk_fetch(root, rel);

if (!ts_chunk_is_partial(chunk) && ts_chunk_is_compressed(chunk))
{
rel->indexlist = NIL;
}
Expand Down
42 changes: 42 additions & 0 deletions src/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <nodes/parsenodes.h>
#include <nodes/pathnodes.h>
#include <nodes/pg_list.h>
#include <parser/parsetree.h>

#include "chunk.h"
#include "export.h"
#include "guc.h"
#include "hypertable.h"
Expand Down Expand Up @@ -108,3 +110,43 @@
Hypertable *hypertable);
TsRelType TSDLLEXPORT ts_classify_relation(const PlannerInfo *root, const RelOptInfo *rel,
Hypertable **ht);

/*
* Chunk-equivalent of planner_rt_fetch(), but returns the corresponding chunk
* instead of range table entry.
*
* Returns NULL if this rel is not a chunk.
*
* This cache should be pre-warmed by hypertable expansion, but it
* doesn't run in the following cases:
*
* 1. if it was a direct query on the chunk;
*
* 2. if it is not a SELECT QUERY.
*/
static inline const Chunk *
ts_planner_chunk_fetch(const PlannerInfo *root, RelOptInfo *rel)
{
TimescaleDBPrivate *rel_private;

/* The rel can only be a chunk if it is part of a hypertable expansion
* (RELOPT_OTHER_MEMBER_REL) or a directly query on the chunk
* (RELOPT_BASEREL) */
if (rel->reloptkind != RELOPT_OTHER_MEMBER_REL && rel->reloptkind != RELOPT_BASEREL)
return NULL;

Check warning on line 136 in src/planner/planner.h

View check run for this annotation

Codecov / codecov/patch

src/planner/planner.h#L136

Added line #L136 was not covered by tests

/* The rel_private entry should have been created as part of classifying
* the relation in timescaledb_get_relation_info_hook(). Therefore,
* ts_get_private_reloptinfo() asserts that it is already set but falls
* back to creating rel_private in release builds for safety. */
rel_private = ts_get_private_reloptinfo(rel);

if (NULL == rel_private->cached_chunk_struct)
{
RangeTblEntry *rte = planner_rt_fetch(rel->relid, root);
rel_private->cached_chunk_struct =
ts_chunk_get_by_relid(rte->relid, /* fail_if_not_found = */ true);
}

return rel_private->cached_chunk_struct;
}
74 changes: 74 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1748,3 +1748,77 @@
elog(ERROR, "this stub function is used only as placeholder during extension updates");
PG_RETURN_NULL();
}

/*
* Get relation information from the syscache in one call.
*
* Returns relkind and access method used. Both are non-optional.
*/
void
ts_get_rel_info(Oid relid, Oid *amoid, char *relkind)
{
HeapTuple tuple;
Form_pg_class cform;

tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));

if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", relid);

cform = (Form_pg_class) GETSTRUCT(tuple);
*amoid = cform->relam;
*relkind = cform->relkind;
ReleaseSysCache(tuple);
}

/*
* Get relation information from the syscache in one call.
*
* Returns relid, relkind and access method used. All are non-optional.
*/
void
ts_get_rel_info_by_name(const char *relnamespace, const char *relname, Oid *relid, Oid *amoid,
char *relkind)
{
HeapTuple tuple;
Form_pg_class cform;
Oid namespaceoid = get_namespace_oid(relnamespace, false);

tuple = SearchSysCache2(RELNAMENSP, PointerGetDatum(relname), ObjectIdGetDatum(namespaceoid));

if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %s.%s", relnamespace, relname);

cform = (Form_pg_class) GETSTRUCT(tuple);
*relid = cform->oid;
*amoid = cform->relam;
*relkind = cform->relkind;
ReleaseSysCache(tuple);
}

static Oid hypercore_amoid = InvalidOid;

bool
ts_is_hypercore_am(Oid amoid)
{
/* Can't use InvalidOid as an indication of non-cached value since
get_am_oid() will return InvalidOid when the access method does not
exist and we will do the lookup every time this query is called. This
boolean can be removed once we know that there should exist an access
method with the given name. */
static bool iscached = false;

if (!iscached && !OidIsValid(hypercore_amoid))
{
hypercore_amoid = get_am_oid("hypercore", true);
iscached = true;
}

if (!OidIsValid(hypercore_amoid))
return false;

/* Shouldn't get here for now */
Assert(false);

Check warning on line 1821 in src/utils.c

View check run for this annotation

Codecov / codecov/patch

src/utils.c#L1821

Added line #L1821 was not covered by tests

return amoid == hypercore_amoid;
}
5 changes: 5 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,8 @@ ts_datum_set_objectid(const AttrNumber attno, NullableDatum *datums, const Oid v
else
datums[AttrNumberGetAttrOffset(attno)].isnull = true;
}

extern TSDLLEXPORT void ts_get_rel_info_by_name(const char *relnamespace, const char *relname,
Oid *relid, Oid *amoid, char *relkind);
extern TSDLLEXPORT void ts_get_rel_info(Oid relid, Oid *amoid, char *relkind);
extern TSDLLEXPORT bool ts_is_hypercore_am(Oid amoid);
24 changes: 13 additions & 11 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ static DecompressChunkPath *decompress_chunk_path_create(PlannerInfo *root, Comp
int parallel_workers,
Path *compressed_path);

static void decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, Chunk *chunk,
RelOptInfo *chunk_rel, bool needs_sequence_num);
static void decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info,
const Chunk *chunk, RelOptInfo *chunk_rel,
bool needs_sequence_num);

static SortInfo build_sortinfo(Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info,
static SortInfo build_sortinfo(const Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info,
List *pathkeys);

static bool
Expand Down Expand Up @@ -254,7 +255,8 @@ copy_decompress_chunk_path(DecompressChunkPath *src)
}

static CompressionInfo *
build_compressioninfo(PlannerInfo *root, Hypertable *ht, Chunk *chunk, RelOptInfo *chunk_rel)
build_compressioninfo(PlannerInfo *root, const Hypertable *ht, const Chunk *chunk,
RelOptInfo *chunk_rel)
{
AppendRelInfo *appinfo;
CompressionInfo *info = palloc0(sizeof(CompressionInfo));
Expand Down Expand Up @@ -510,7 +512,7 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
* compatible and the optimization can be used.
*/
static MergeBatchResult
can_batch_sorted_merge(PlannerInfo *root, CompressionInfo *info, Chunk *chunk)
can_batch_sorted_merge(PlannerInfo *root, CompressionInfo *info, const Chunk *chunk)
{
PathKey *pk;
Var *var;
Expand Down Expand Up @@ -620,8 +622,8 @@ can_batch_sorted_merge(PlannerInfo *root, CompressionInfo *info, Chunk *chunk)
* To save planning time, we therefore refrain from adding them.
*/
static void
add_chunk_sorted_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht, Index ht_relid,
Path *path, Path *compressed_path)
add_chunk_sorted_paths(PlannerInfo *root, RelOptInfo *chunk_rel, const Hypertable *ht,
Index ht_relid, Path *path, Path *compressed_path)
{
if (root->query_pathkeys == NIL)
return;
Expand Down Expand Up @@ -681,8 +683,8 @@ add_chunk_sorted_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht,
#define IS_UPDL_CMD(parse) \
((parse)->commandType == CMD_UPDATE || (parse)->commandType == CMD_DELETE)
void
ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht,
Chunk *chunk)
ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, const Hypertable *ht,
const Chunk *chunk)
{
RelOptInfo *compressed_rel;
ListCell *lc;
Expand Down Expand Up @@ -1650,7 +1652,7 @@ compressed_rel_setup_equivalence_classes(PlannerInfo *root, CompressionInfo *inf
* and add it to PlannerInfo
*/
static void
decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, Chunk *chunk,
decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, const Chunk *chunk,
RelOptInfo *chunk_rel, bool needs_sequence_num)
{
Index compressed_index = root->simple_rel_array_size;
Expand Down Expand Up @@ -2012,7 +2014,7 @@ find_const_segmentby(RelOptInfo *chunk_rel, CompressionInfo *info)
* If query pathkeys is shorter than segmentby + compress_orderby pushdown can still be done
*/
static SortInfo
build_sortinfo(Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info, List *pathkeys)
build_sortinfo(const Chunk *chunk, RelOptInfo *chunk_rel, CompressionInfo *info, List *pathkeys)
{
int pk_index;
PathKey *pk;
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ typedef struct DecompressChunkPath
bool batch_sorted_merge;
} DecompressChunkPath;

void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,
Chunk *chunk);
void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, const Hypertable *ht,
const Chunk *chunk);

extern bool ts_is_decompress_chunk_path(Path *path);

Expand Down
77 changes: 48 additions & 29 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <commands/extension.h>
#include <foreign/fdwapi.h>
#include <nodes/nodeFuncs.h>
#include <nodes/parsenodes.h>
#include <optimizer/paths.h>
#include <parser/parsetree.h>

Expand Down Expand Up @@ -97,41 +98,59 @@
}
}

/*
* Check if a chunk should be decompressed via a DecompressChunk plan.
*
* Check first that it is a compressed chunk. Then, decompress unless it is
* SELECT * FROM ONLY <chunk>. We check if it is the ONLY case by calling
* ts_rte_is_marked_for_expansion. Respecting ONLY here is important to not
* break postgres tools like pg_dump.
*/
static inline bool
use_decompress_chunk_node(const RelOptInfo *rel, const RangeTblEntry *rte, const Chunk *chunk)
{
return ts_guc_enable_transparent_decompression &&
/* Check that the chunk is actually compressed */
chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID &&
/* Check that it is _not_ SELECT FROM ONLY <chunk> */
(rel->reloptkind != RELOPT_BASEREL || ts_rte_is_marked_for_expansion(rte));
}

void
tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte,
Hypertable *ht)
{
/* We can get here via query on hypertable in that case reloptkind
* will be RELOPT_OTHER_MEMBER_REL or via direct query on chunk
* in that case reloptkind will be RELOPT_BASEREL.
* If we get here via SELECT * FROM <chunk>, we decompress the chunk,
* unless the query was SELECT * FROM ONLY <chunk>.
* We check if it is the ONLY case by calling ts_rte_is_marked_for_expansion.
* Respecting ONLY here is important to not break postgres tools like pg_dump.
/* Only interested in queries on relations that are part of hypertables
* with compression enabled, so quick exit if not this case. */
if (ht == NULL || !TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
return;

/*
* For a chunk, we can get here via a query on the hypertable that expands
* to the chunk or by direct query on the chunk. In the former case,
* reloptkind will be RELOPT_OTHER_MEMBER_REL (nember of hypertable) or in
* the latter case reloptkind will be RELOPT_BASEREL (standalone rel).
*
* These two cases are checked in ts_planner_chunk_fetch().
*/
TimescaleDBPrivate *fdw_private = (TimescaleDBPrivate *) rel->fdw_private;
if (ts_guc_enable_transparent_decompression && ht &&
(rel->reloptkind == RELOPT_OTHER_MEMBER_REL ||
(rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(rte))) &&
TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
{
if (fdw_private->cached_chunk_struct == NULL)
{
/*
* We can not have the cached Chunk struct,
* 1) if it was a direct query on the chunk;
* 2) if it is not a SELECT QUERY.
* Caching is done by our hypertable expansion, which doesn't run in
* these cases.
*/
fdw_private->cached_chunk_struct =
ts_chunk_get_by_relid(rte->relid, /* fail_if_not_found = */ true);
}
const Chunk *chunk = ts_planner_chunk_fetch(root, rel);

if (fdw_private->cached_chunk_struct->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ts_decompress_chunk_generate_paths(root, rel, ht, fdw_private->cached_chunk_struct);
}
if (chunk == NULL)
return;

Check warning on line 139 in tsl/src/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/planner.c#L139

Added line #L139 was not covered by tests

if (use_decompress_chunk_node(rel, rte, chunk))
{
ts_decompress_chunk_generate_paths(root, rel, ht, chunk);
}
/*
* If using our own access method on the chunk, we might want to add
* alternative paths. This should not be compatible with transparent
* decompression, so only add if we didn't add decompression paths above.
*/
else if (ts_is_hypercore_am(chunk->amoid))
{
/* To be implemented */
Assert(false);

Check warning on line 153 in tsl/src/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/planner.c#L153

Added line #L153 was not covered by tests
}
}

Expand Down
Loading