From 8562452eae88f2ffce0b10eb55acd1ed452b1e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lisen=20=E6=9D=A8?= Date: Wed, 13 Dec 2023 18:02:08 +0800 Subject: [PATCH] temp --- src/Common/HashMapsTemplate.h | 199 +++++++++++--------- src/Interpreters/Streaming/AsofHashJoin.cpp | 80 ++++---- src/Interpreters/Streaming/AsofHashJoin.h | 2 +- src/Interpreters/Streaming/joinData.h | 64 +++---- 4 files changed, 175 insertions(+), 170 deletions(-) diff --git a/src/Common/HashMapsTemplate.h b/src/Common/HashMapsTemplate.h index 09e1a03193..30c4124b35 100644 --- a/src/Common/HashMapsTemplate.h +++ b/src/Common/HashMapsTemplate.h @@ -13,6 +13,105 @@ class WriteBuffer; class ReadBuffer; +template +using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl; + +/// Dummy key getter, always find nothing, used for JOIN ON NULL +template +class KeyGetterEmpty +{ +public: + struct MappedType + { + using mapped_type = Mapped; + }; + + using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl; + + KeyGetterEmpty() = default; + + FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); } +}; + +template +struct KeyGetterForTypeImpl; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodOneNumber; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodOneNumber; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodOneNumber; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodOneNumber; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodString; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodFixedString; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodKeysFixed; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodKeysFixed; +}; + +template +struct KeyGetterForTypeImpl +{ + using Type = ColumnsHashing::HashMethodHashed; +}; + +template +struct KeyGetterForType +{ + using Value = typename Data::value_type; + using Mapped_t = typename Data::mapped_type; + using Mapped = std::conditional_t, const Mapped_t, Mapped_t>; + using Type = typename KeyGetterForTypeImpl::Type; +}; + +template +requires (std::is_invocable_v) +void insertIntoHashMap( + Map & map, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler) +{ + KeyGetter key_getter(key_columns, key_sizes, nullptr); + for (size_t i = 0; i < rows; ++i) + { + auto emplace_result = key_getter.emplaceKey(map, i, pool); + mapped_handler(emplace_result.getMapped(), emplace_result.isInserted(), i); + } +} + template void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, WriteBuffer & wb) { @@ -139,6 +238,21 @@ struct HashMapsTemplate type = which; } + template + void insert(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler) + { + switch (which) + { +#define M(NAME) \ + case HashType::NAME: \ + using KeyGetter = typename KeyGetterForType>::Type; \ + insertIntoHashMap(*NAME, key_columns, key_sizes, rows, pool, std::mode(mapped_handler)); \ + break; + APPLY_FOR_HASH_KEY_VARIANTS(M) +#undef M + } + } + size_t getTotalRowCount() const { switch (type) @@ -219,89 +333,4 @@ struct HashMapsTemplate HashType type; }; -template -using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl; - -/// Dummy key getter, always find nothing, used for JOIN ON NULL -template -class KeyGetterEmpty -{ -public: - struct MappedType - { - using mapped_type = Mapped; - }; - - using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl; - - KeyGetterEmpty() = default; - - FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); } -}; - -template -struct KeyGetterForTypeImpl; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodOneNumber; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodOneNumber; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodOneNumber; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodOneNumber; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodString; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodFixedString; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodKeysFixed; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodKeysFixed; -}; - -template -struct KeyGetterForTypeImpl -{ - using Type = ColumnsHashing::HashMethodHashed; -}; - -template -struct KeyGetterForType -{ - using Value = typename Data::value_type; - using Mapped_t = typename Data::mapped_type; - using Mapped = std::conditional_t, const Mapped_t, Mapped_t>; - using Type = typename KeyGetterForTypeImpl::Type; -}; } diff --git a/src/Interpreters/Streaming/AsofHashJoin.cpp b/src/Interpreters/Streaming/AsofHashJoin.cpp index 9bb55284a1..ac50cd2138 100644 --- a/src/Interpreters/Streaming/AsofHashJoin.cpp +++ b/src/Interpreters/Streaming/AsofHashJoin.cpp @@ -14,6 +14,9 @@ AsofHashJoin::AsofHashJoin( , asof_type(*table_join->getAsofType()) , asof_inequality(table_join->getAsofInequality()) { + /// last key is asof column, not use as a hash key + hash_key_sizes = key_sizes; + hash_key_sizes.back(); } void AsofHashJoin::joinLeftBlock(Block & block) @@ -39,61 +42,44 @@ void AsofHashJoin::insertRightBlock(Block block) for (const auto & name : key_names) key_columns.push_back(all_key_columns[name]); - /// We will insert to the map only keys, where all components are not NULL. - ConstNullMapPtr null_map{}; - ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map); - - /// If LEFT, RIGHT or FULL save blocks with nulls for NotJoinedBlocks - UInt8 save_nullmap = 0; - if (isRightOrFull(table_join->kind()) && null_map) - { - /// Save rows with NULL keys - for (size_t i = 0; !save_nullmap && i < null_map->size(); ++i) - save_nullmap |= (*null_map)[i]; - } + auto asof_column = key_columns.back(); + key_columns.pop_back(); /// Add `block_to_save` to target stream data /// Note `block_to_save` may be empty for cases in which the query doesn't care other non-key columns. /// For example, SELECT count() FROM stream_a JOIN stream_b ON i=ii; - auto start_row = buffered_hash_data->addOrConcatDataBlock(std::move(block_to_save)); - auto rows = buffered_hash_data->lastDataBlock().rows(); + auto rows = block_to_save.rows(); + auto start_row = right_buffered_hash_data->blocks.pushBackOrConcat(std::move(block_to_save)); + auto row_ref_handler = [&](AsofRowRef & row_ref, bool inserted, size_t original_row, size_t row) { + AsofRowRef * row_ref_ptr = &row_ref; + if (inserted) + row_ref_ptr = new (row_ref_ptr) AsofRowRef(asof_type); - switch (hash_method_type) - { -#define M(TYPE) \ - case HashType::TYPE: \ - return insertFromBlockImplType< \ - Strictness::Asof, \ - typename KeyGetterForTypemaps->TYPE))>>::Type>( \ - join, \ - *(buffered_hash_data->maps->TYPE), \ - rows, \ - key_columns, \ - key_sizes[0], \ - &buffered_hash_data->blocks, \ - start_row, \ - null_map, \ - buffered_hash_data->pool); \ - break; - APPLY_FOR_HASH_KEY_VARIANTS(M) -#undef M - } - insertFromBlockImpl( - hash_method_type, - map, - rows, - key_columns, - key_sizes[0], - &target_hash_blocks->blocks, - start_row, - null_map, - target_hash_blocks->pool); + row_ref_ptr->insert( + asof_type, + asof_column, + &(right_buffered_hash_data->blocks), + original_row, + row, + asof_inequality, + rightJoinStreamDescription()->keep_versions); + }; - if (save_nullmap) - /// FIXME, we will need account the allocated bytes for null_map_holder / not_joined_map as well - buffered_hash_data->blocks_nullmaps.emplace_back(&buffered_hash_data->lastDataBlock(), null_map_holder); + right_buffered_hash_data->map.insert(std::move(block_to_save), key_columns, hash_key_sizes, rows, right_buffered_hash_data->pool, std::move(row_ref_handler)); checkLimits(); } + +void AsofHashJoin::checkLimits() const +{ + auto current_total_bytes = right_buffered_hash_data->totalBufferedBytes(); + if (current_total_bytes >= join_max_cached_bytes) + throw Exception( + ErrorCodes::SET_SIZE_LIMIT_EXCEEDED, + "Streaming asof join's memory reaches max size: {}, current total: {}, right: {}", + join_max_cached_bytes, + current_total_bytes, + buffered_hash_data->getMetricsString()); +} } } diff --git a/src/Interpreters/Streaming/AsofHashJoin.h b/src/Interpreters/Streaming/AsofHashJoin.h index a9cf10f05e..4f0c590770 100644 --- a/src/Interpreters/Streaming/AsofHashJoin.h +++ b/src/Interpreters/Streaming/AsofHashJoin.h @@ -25,7 +25,7 @@ class AsofHashJoin final : public HashJoin using DataBlock = LightChunkWithTimestamp; using BufferedAsofHashData = BufferedHashData>; - SERDE std::unique_ptr buffered_hash_data; + SERDE std::unique_ptr right_buffered_hash_data; }; } diff --git a/src/Interpreters/Streaming/joinData.h b/src/Interpreters/Streaming/joinData.h index ab469dc44f..03d231c7d2 100644 --- a/src/Interpreters/Streaming/joinData.h +++ b/src/Interpreters/Streaming/joinData.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -234,51 +235,38 @@ template struct BufferedHashData { using JoinDataBlockList = RefCountDataBlockList; - using JoinDataBlockRawPtr = const JoinDataBlock *; - using BlockNullmapList = std::deque>; - using Map = HashMapsTemplate; - - BufferedHashData(size_t data_block_size, CachedBlockMetrics & metrics); + using HashMap = HashMapsTemplate; + BufferedHashData(size_t data_block_size) : blocks(data_block_size, metrics) { } ~BufferedHashData(); - template - void insert(JoinDataBlock && block, RowRefHandler && handler) - { - /// Add block into `blocks` - auto start_row = blocks.pushBackOrConcat(std::move(block)); - - /// Insert MappedRowRef into hash map (which is referenced to `blocks`) - const IColumn * asof_column [[maybe_unused]] = nullptr; - if constexpr (is_range_asof_join || is_asof_join) - asof_column = key_columns.back(); + size_t totalBufferedBytes() const { return metrics.current_total_bytes + map.getTotalByteCountImpl(); } - auto key_columns - if constexpr (is_asof_join) - { - auto key_column_copy = key_columns; - auto key_size_copy = key_sizes; - key_column_copy.pop_back(); - key_size_copy.pop_back(); - return KeyGetter(key_column_copy, key_size_copy, nullptr); - } - else - return KeyGetter(key_columns, key_sizes, nullptr); - - auto key_getter = createKeyGetter < KeyGetter, is_range_asof_join || is_asof_join > (key_columns, key_sizes); - - for (size_t i = start_row; i < rows; ++i) - { - auto emplace_result = key_getter.emplaceKey(map, i, pool); - handler(emplace_result.getMapped(), emplace_result.isInserted()); - } + HashMapSizes hashMapSizes() const + { + return HashMapSizes{ + .keys = map.getTotalRowCount(), + .buffer_size_in_bytes = map.getTotalByteCountImpl(), + .buffer_bytes_in_cells = map.getBufferSizeInCells(), + }; } - const JoinDataBlock & lastDataBlock() const { return blocks.lastDataBlock(); } + String getMetricsString() const + { + return fmt::format("cached block list: ({}), hash map: ({})", metrics.string(), hashMapSizes().string()); + } - HashMapSizes hashMapSizes(const HashJoin * hash_join) const; + template + void insert(Block && block, RowRefHandler && row_ref_handler) + { + auto rows = block.rows(); + auto start_row = blocks.pushBackOrConcat(std::move(block)); + auto row_ref_handler = + right_buffered_hash_data->map.insert(std::move(block_to_save), key_columns, key_sizes, rows, right_buffered_hash_data->pool, std::move(row_ref_handler)); + } /// Buffered data + CachedBlockMetrics metrics; JoinDataBlockList blocks; /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. @@ -286,9 +274,11 @@ struct BufferedHashData /// Hash maps variants are attached to original source blocks, and will be garbage collected /// automatically along with the source blocks. Hence put it here instead of BufferedStreamData - Map map; + HashMap map; /// FIXME: support nullable type + // using JoinDataBlockRawPtr = const JoinDataBlock *; + // using BlockNullmapList = std::deque>; // BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed) };