Skip to content

Commit

Permalink
Backport #28700 to 21.8: Fix crash on exception with projection aggre…
Browse files Browse the repository at this point in the history
…gate
  • Loading branch information
robot-clickhouse committed Sep 9, 2021
1 parent 07eac7f commit bc99d4e
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 35 deletions.
20 changes: 10 additions & 10 deletions src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
Arena * arena) const
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
Expand Down Expand Up @@ -818,7 +818,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(


void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
Expand Down Expand Up @@ -870,15 +870,15 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns


bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
}


bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
Expand Down Expand Up @@ -1001,7 +1001,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
}


void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const
{
Stopwatch watch;
size_t rows = data_variants.size();
Expand Down Expand Up @@ -1073,7 +1073,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
}


void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const
{
String tmp_path = params.tmp_volume->getDisk()->getPath();
return writeToTemporaryFile(data_variants, tmp_path);
Expand Down Expand Up @@ -1135,7 +1135,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out)
IBlockOutputStream & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
Expand Down Expand Up @@ -2251,7 +2251,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
block.clear();
}

bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys)
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
Expand Down Expand Up @@ -2601,7 +2601,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
}


std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block) const
{
if (!block)
return {};
Expand Down Expand Up @@ -2693,7 +2693,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
}


void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) const
{
if (result.empty())
return;
Expand Down
27 changes: 14 additions & 13 deletions src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ struct AggregatedDataVariants : private boost::noncopyable
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
* In this case, the pool will not be able to know with what offsets objects are stored.
*/
Aggregator * aggregator = nullptr;
const Aggregator * aggregator = nullptr;

size_t keys_size{}; /// Number of keys. NOTE do we need this field?
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
Expand Down Expand Up @@ -975,11 +975,14 @@ class Aggregator final
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;

bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;

/// Used for aggregate projection.
bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const;

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
Expand All @@ -996,8 +999,6 @@ class Aggregator final
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);

bool mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys);

/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
Expand All @@ -1007,11 +1008,11 @@ class Aggregator final
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;

/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;

bool hasTemporaryFiles() const { return !temporary_files.empty(); }

Expand Down Expand Up @@ -1083,7 +1084,7 @@ class Aggregator final
Poco::Logger * log = &Poco::Logger::get("Aggregator");

/// For external aggregation.
TemporaryFiles temporary_files;
mutable TemporaryFiles temporary_files;

#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
Expand All @@ -1106,7 +1107,7 @@ class Aggregator final
/** Call `destroy` methods for states of aggregate functions.
* Used in the exception handler for aggregation, since RAII in this case is not applicable.
*/
void destroyAllAggregateStates(AggregatedDataVariants & result);
void destroyAllAggregateStates(AggregatedDataVariants & result) const;


/// Process one data block, aggregate the data into a hash table.
Expand Down Expand Up @@ -1135,7 +1136,7 @@ class Aggregator final
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
Arena * arena) const;

static void executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
Expand All @@ -1148,7 +1149,7 @@ class Aggregator final
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out);
IBlockOutputStream & out) const;

/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>
Expand Down Expand Up @@ -1303,7 +1304,7 @@ class Aggregator final
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
NestedColumnsHolder & nested_columns_holder) const;

void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
Expand Down
15 changes: 10 additions & 5 deletions src/Processors/Transforms/AggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,14 @@ AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformPar
}

AggregatingTransform::AggregatingTransform(
Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_,
size_t current_variant, size_t max_threads_, size_t temporary_data_merge_threads_)
: IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_))
Block header,
AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data_,
size_t current_variant,
size_t max_threads_,
size_t temporary_data_merge_threads_)
: IProcessor({std::move(header)}, {params_->getHeader()})
, params(std::move(params_))
, key_columns(params->params.keys_size)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
Expand Down Expand Up @@ -525,7 +530,7 @@ void AggregatingTransform::consume(Chunk chunk)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
block = materializeBlock(block);
if (!params->aggregator.mergeBlock(block, variants, no_more_keys))
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
is_consume_finished = true;
}
else
Expand All @@ -547,7 +552,7 @@ void AggregatingTransform::initGenerate()
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)
{
if (params->only_merge)
params->aggregator.mergeBlock(getInputs().front().getHeader(), variants, no_more_keys);
params->aggregator.mergeOnBlock(getInputs().front().getHeader(), variants, no_more_keys);
else
params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys);
}
Expand Down
37 changes: 32 additions & 5 deletions src/Processors/Transforms/AggregatingTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,38 @@ class AggregatedChunkInfo : public ChunkInfo
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;

using AggregatorList = std::list<Aggregator>;
using AggregatorListPtr = std::shared_ptr<AggregatorList>;

struct AggregatingTransformParams
{
Aggregator::Params params;
Aggregator aggregator;

/// Each params holds a list of aggregators which are used in query. It's needed because we need
/// to use a pointer of aggregator to proper destroy complex aggregation states on exception
/// (See comments in AggregatedDataVariants). However, this pointer might not be valid because
/// we can have two different aggregators at the same time due to mixed pipeline of aggregate
/// projections, and one of them might gets destroyed before used.
AggregatorListPtr aggregator_list_ptr;
Aggregator & aggregator;
bool final;
bool only_merge = false;

AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_) {}
: params(params_)
, aggregator_list_ptr(std::make_shared<AggregatorList>())
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
{
}

AggregatingTransformParams(const Aggregator::Params & params_, const AggregatorListPtr & aggregator_list_ptr_, bool final_)
: params(params_)
, aggregator_list_ptr(aggregator_list_ptr_)
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
{
}

Block getHeader() const { return aggregator.getHeader(final); }

Expand Down Expand Up @@ -82,9 +105,13 @@ class AggregatingTransform : public IProcessor
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_);

/// For Parallel aggregating.
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data, size_t current_variant,
size_t max_threads, size_t temporary_data_merge_threads);
AggregatingTransform(
Block header,
AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data,
size_t current_variant,
size_t max_threads,
size_t temporary_data_merge_threads);
~AggregatingTransform() override;

String getName() const override { return "AggregatingTransform"; }
Expand Down
8 changes: 6 additions & 2 deletions src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;

AggregatorListPtr aggregator_list_ptr = std::make_shared<AggregatorList>();

// TODO apply in_order_optimization here
auto build_aggregate_pipe = [&](Pipe & pipe, bool projection)
{
Expand Down Expand Up @@ -305,7 +307,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_count_to_compile_aggregate_expression,
header_before_aggregation); // The source header is also an intermediate header

transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final);

/// This part is hacky.
/// We want AggregatingTransform to work with aggregate states instead of normal columns.
Expand Down Expand Up @@ -335,7 +338,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);

transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final);
}

pipe.resize(pipe.numOutputPorts(), true, true);
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
drop table if exists t;

create table t (x UInt32) engine = MergeTree order by tuple() settings index_granularity = 8;
insert into t select number from numbers(100);
alter table t add projection p (select uniqHLL12(x));
insert into t select number + 100 from numbers(100);
select uniqHLL12(x) from t settings allow_experimental_projection_optimization = 1, max_bytes_to_read=400, max_block_size=8; -- { serverError 307; }

drop table if exists t;

0 comments on commit bc99d4e

Please sign in to comment.