diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 9bee509adff..0022b4c0adb 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -44,8 +44,8 @@ static char *build_merge_update_clause(List *column_names); * materialization support * ***************************/ static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - Oid materialization_type, const char *const chunk_condition); + const NameData *time_column_name, TimeRange materialization_range, + const char *const chunk_condition); static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, @@ -53,19 +53,19 @@ static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg TimeRange invalidation_range, const int32 chunk_id); static uint64 spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name, - char *invalidation_start, char *invalidation_end, + TimeRange materialization_range, const char *const chunk_condition); static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, const NameData *time_column_name, - char *materialization_start, char *materialization_end, + TimeRange materialization_range, const char *const chunk_condition); static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, const NameData *time_column_name, - char *materialization_start, char *materialization_end); + TimeRange materialization_range); void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg, @@ -329,32 +329,40 @@ build_merge_update_clause(List *column_names) static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - Oid materialization_type, const char *const chunk_condition) + const NameData *time_column_name, TimeRange materialization_range, + const char *const chunk_condition) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type }; + Datum values[] = { materialization_range.start }; + char nulls[] = { false }; appendStringInfo(command, "SELECT %s FROM %s.%s AS I " - "WHERE I.%s >= %s %s " + "WHERE I.%s >= $1 %s " "ORDER BY 1 DESC LIMIT 1;", quote_identifier(NameStr(*time_column_name)), quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), chunk_condition); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 1, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, "could not get the last bucket of the materialized data"); - Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_type, + Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_range.type, "partition types for result (%d) and dimension (%d) do not match", SPI_gettypeid(SPI_tuptable->tupdesc, 1), - materialization_type); + materialization_range.type); if (SPI_processed > 0) { @@ -363,7 +371,7 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, if (!isnull) { - int64 watermark = ts_time_value_to_internal(maxdat, materialization_type); + int64 watermark = ts_time_value_to_internal(maxdat, materialization_range.type); ts_cagg_watermark_update(mat_ht, watermark, isnull, false); } } @@ -375,17 +383,9 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, const NameData *time_column_name, TimeRange invalidation_range, const int32 chunk_id) { - Oid out_fn; - bool type_is_varlena; - char *invalidation_start; - char *invalidation_end; StringInfo chunk_condition = makeStringInfo(); uint64 rows_processed = 0; - getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena); - invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start); - invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end); - /* MERGE statement is available starting on PG15 and we'll support it only in the new format of * CAggs and for non-compressed hypertables */ if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && @@ -396,8 +396,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, partial_view, materialization_table, time_column_name, - invalidation_start, - invalidation_end); + invalidation_range); } else { @@ -412,16 +411,14 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, rows_processed += spi_delete_materializations(materialization_table, time_column_name, - invalidation_start, - invalidation_end, + invalidation_range, chunk_condition->data); rows_processed += spi_insert_materializations(mat_ht, cagg, partial_view, materialization_table, time_column_name, - invalidation_start, - invalidation_end, + invalidation_range, chunk_condition->data); } @@ -431,8 +428,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, spi_update_watermark(mat_ht, materialization_table, time_column_name, - invalidation_start, - invalidation_range.type, + invalidation_range, chunk_condition->data); } } @@ -440,11 +436,20 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - char *materialization_end) + const NameData *time_column_name, TimeRange materialization_range) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type, + materialization_range.type, + materialization_range.type, + materialization_range.type }; + Datum values[] = { materialization_range.start, + materialization_range.end, + materialization_range.start, + materialization_range.end }; + char nulls[] = { false, false, false, false }; + List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) cagg, mat_ht); List *agg_colnames = cagg_find_aggref_and_var_cols((ContinuousAgg *) cagg, mat_ht); List *all_columns = NIL; @@ -472,10 +477,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, "WITH partial AS ( " " SELECT * " " FROM %s.%s " - " WHERE %s >= %s AND %s < %s " + " WHERE %s >= $1 AND %s < $2 " ") " "MERGE INTO %s.%s M " - "USING partial P ON %s AND M.%s >= %s AND M.%s < %s " + "USING partial P ON %s AND M.%s >= $3 AND M.%s < $4 " " %s " /* UPDATE */ " WHEN NOT MATCHED THEN " " INSERT (%s) VALUES (%s) ", @@ -486,9 +491,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* partial WHERE */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), /* materialization hypertable */ quote_identifier(NameStr(*materialization_table.schema)), @@ -499,9 +502,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* extra MERGE JOIN condition with primary dimension */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), /* UPDATE */ merge_update->data, @@ -511,7 +512,13 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, build_merge_insert_columns(all_columns, ", ", "P.")); elog(DEBUG2, "%s", command->data); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 4, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, @@ -532,10 +539,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, appendStringInfo(command, "DELETE " "FROM %s.%s M " - "WHERE M.%s >= %s AND M.%s < %s " + "WHERE M.%s >= $1 AND M.%s < $2 " "AND NOT EXISTS (" " SELECT FROM %s.%s P " - " WHERE %s AND P.%s >= %s AND P.%s < %s) ", + " WHERE %s AND P.%s >= $3 AND P.%s < $4) ", /* materialization hypertable */ quote_identifier(NameStr(*materialization_table.schema)), @@ -543,9 +550,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* materialization hypertable WHERE */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), /* partial VIEW */ quote_identifier(NameStr(*partial_view.schema)), @@ -556,11 +561,16 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* partial WHERE */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), - quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end)); + quote_identifier(NameStr(*time_column_name))); + elog(DEBUG2, "%s", command->data); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 4, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, @@ -581,24 +591,30 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, static uint64 spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name, - char *invalidation_start, char *invalidation_end, - const char *const chunk_condition) + TimeRange materialization_range, const char *const chunk_condition) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type, materialization_range.type }; + Datum values[] = { materialization_range.start, materialization_range.end }; + char nulls[] = { false, false }; appendStringInfo(command, "DELETE FROM %s.%s AS D WHERE " - "D.%s >= %s AND D.%s < %s %s;", + "D.%s >= $1 AND D.%s < $2 %s;", quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(invalidation_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(invalidation_end), chunk_condition); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 2, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, @@ -618,26 +634,33 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - char *materialization_end, const char *const chunk_condition) + const NameData *time_column_name, TimeRange materialization_range, + const char *const chunk_condition) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type, materialization_range.type }; + Datum values[] = { materialization_range.start, materialization_range.end }; + char nulls[] = { false, false }; appendStringInfo(command, "INSERT INTO %s.%s SELECT * FROM %s.%s AS I " - "WHERE I.%s >= %s AND I.%s < %s %s;", + "WHERE I.%s >= $1 AND I.%s < $2 %s;", quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*partial_view.schema)), quote_identifier(NameStr(*partial_view.name)), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), chunk_condition); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 2, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR,