Skip to content

Commit

Permalink
Replace SPI_execute by SPI_execute_with_args in materialization
Browse files Browse the repository at this point in the history
To prevent issues on TimestampTz output used in SQL statements
dynamically generated for the Continuous Aggregate materialization
code we replaced the `SPI_execute` by `SPI_execute_with_args` so now
it will not convert timestamp to string and parse it back anymore.

This popped up due to some tzdata 2024b changes in timezone PST8PDT
that also lead to some Postgres changes on PG17:

postgres/postgres@b8ea0f67
  • Loading branch information
fabriziomello committed Oct 7, 2024
1 parent bfc3041 commit b685f2f
Showing 1 changed file with 81 additions and 58 deletions.
139 changes: 81 additions & 58 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ static char *build_merge_update_clause(List *column_names);
* materialization support *
***************************/
static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
Oid materialization_type, const char *const chunk_condition);
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition);
static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
TimeRange invalidation_range, const int32 chunk_id);
static uint64 spi_delete_materializations(SchemaAndName materialization_table,
const NameData *time_column_name,
char *invalidation_start, char *invalidation_end,
TimeRange materialization_range,
const char *const chunk_condition);
static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
char *materialization_start, char *materialization_end,
TimeRange materialization_range,
const char *const chunk_condition);
static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
char *materialization_start, char *materialization_end);
TimeRange materialization_range);

void
continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
Expand Down Expand Up @@ -329,32 +329,40 @@ build_merge_update_clause(List *column_names)

static void
spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
Oid materialization_type, const char *const chunk_condition)
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type };
Datum values[] = { materialization_range.start };
char nulls[] = { false };

appendStringInfo(command,
"SELECT %s FROM %s.%s AS I "
"WHERE I.%s >= %s %s "
"WHERE I.%s >= $1 %s "
"ORDER BY 1 DESC LIMIT 1;",
quote_identifier(NameStr(*time_column_name)),
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
1,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR, "could not get the last bucket of the materialized data");

Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_type,
Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_range.type,
"partition types for result (%d) and dimension (%d) do not match",
SPI_gettypeid(SPI_tuptable->tupdesc, 1),
materialization_type);
materialization_range.type);

if (SPI_processed > 0)
{
Expand All @@ -363,7 +371,7 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,

if (!isnull)
{
int64 watermark = ts_time_value_to_internal(maxdat, materialization_type);
int64 watermark = ts_time_value_to_internal(maxdat, materialization_range.type);
ts_cagg_watermark_update(mat_ht, watermark, isnull, false);
}
}
Expand All @@ -375,17 +383,9 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
const NameData *time_column_name, TimeRange invalidation_range,
const int32 chunk_id)
{
Oid out_fn;
bool type_is_varlena;
char *invalidation_start;
char *invalidation_end;
StringInfo chunk_condition = makeStringInfo();
uint64 rows_processed = 0;

getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena);
invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start);
invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end);

/* MERGE statement is available starting on PG15 and we'll support it only in the new format of
* CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
Expand All @@ -396,8 +396,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
partial_view,
materialization_table,
time_column_name,
invalidation_start,
invalidation_end);
invalidation_range);
}
else
{
Expand All @@ -412,16 +411,14 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

rows_processed += spi_delete_materializations(materialization_table,
time_column_name,
invalidation_start,
invalidation_end,
invalidation_range,
chunk_condition->data);
rows_processed += spi_insert_materializations(mat_ht,
cagg,
partial_view,
materialization_table,
time_column_name,
invalidation_start,
invalidation_end,
invalidation_range,
chunk_condition->data);
}

Expand All @@ -431,20 +428,28 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
spi_update_watermark(mat_ht,
materialization_table,
time_column_name,
invalidation_start,
invalidation_range.type,
invalidation_range,
chunk_condition->data);
}
}

static uint64
spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
char *materialization_end)
const NameData *time_column_name, TimeRange materialization_range)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type,
materialization_range.type,
materialization_range.type,
materialization_range.type };
Datum values[] = { materialization_range.start,
materialization_range.end,
materialization_range.start,
materialization_range.end };
char nulls[] = { false, false, false, false };

List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) cagg, mat_ht);
List *agg_colnames = cagg_find_aggref_and_var_cols((ContinuousAgg *) cagg, mat_ht);
List *all_columns = NIL;
Expand Down Expand Up @@ -472,10 +477,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
"WITH partial AS ( "
" SELECT * "
" FROM %s.%s "
" WHERE %s >= %s AND %s < %s "
" WHERE %s >= $1 AND %s < $2 "
") "
"MERGE INTO %s.%s M "
"USING partial P ON %s AND M.%s >= %s AND M.%s < %s "
"USING partial P ON %s AND M.%s >= $3 AND M.%s < $4 "
" %s " /* UPDATE */
" WHEN NOT MATCHED THEN "
" INSERT (%s) VALUES (%s) ",
Expand All @@ -486,9 +491,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* partial WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* materialization hypertable */
quote_identifier(NameStr(*materialization_table.schema)),
Expand All @@ -499,9 +502,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* extra MERGE JOIN condition with primary dimension */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* UPDATE */
merge_update->data,
Expand All @@ -511,7 +512,13 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
build_merge_insert_columns(all_columns, ", ", "P."));

elog(DEBUG2, "%s", command->data);
res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
4,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -532,20 +539,18 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
appendStringInfo(command,
"DELETE "
"FROM %s.%s M "
"WHERE M.%s >= %s AND M.%s < %s "
"WHERE M.%s >= $1 AND M.%s < $2 "
"AND NOT EXISTS ("
" SELECT FROM %s.%s P "
" WHERE %s AND P.%s >= %s AND P.%s < %s) ",
" WHERE %s AND P.%s >= $3 AND P.%s < $4) ",

/* materialization hypertable */
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),

/* materialization hypertable WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* partial VIEW */
quote_identifier(NameStr(*partial_view.schema)),
Expand All @@ -556,11 +561,16 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* partial WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end));
quote_identifier(NameStr(*time_column_name)));

elog(DEBUG2, "%s", command->data);
res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
4,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -581,24 +591,30 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

static uint64
spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name,
char *invalidation_start, char *invalidation_end,
const char *const chunk_condition)
TimeRange materialization_range, const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type, materialization_range.type };
Datum values[] = { materialization_range.start, materialization_range.end };
char nulls[] = { false, false };

appendStringInfo(command,
"DELETE FROM %s.%s AS D WHERE "
"D.%s >= %s AND D.%s < %s %s;",
"D.%s >= $1 AND D.%s < $2 %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_end),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -618,26 +634,33 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData
static uint64
spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
char *materialization_end, const char *const chunk_condition)
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type, materialization_range.type };
Datum values[] = { materialization_range.start, materialization_range.end };
char nulls[] = { false, false };

appendStringInfo(command,
"INSERT INTO %s.%s SELECT * FROM %s.%s AS I "
"WHERE I.%s >= %s AND I.%s < %s %s;",
"WHERE I.%s >= $1 AND I.%s < $2 %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*partial_view.schema)),
quote_identifier(NameStr(*partial_view.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand Down

0 comments on commit b685f2f

Please sign in to comment.