Skip to content

Commit

Permalink
separate different streaming join
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Mar 28, 2024
1 parent e2d9f19 commit 7ecf8db
Show file tree
Hide file tree
Showing 39 changed files with 1,537 additions and 521 deletions.
236 changes: 132 additions & 104 deletions src/Common/HashMapsTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,123 @@ namespace DB
class WriteBuffer;
class ReadBuffer;

/// Different types of keys for maps.
#define APPLY_FOR_HASH_KEY_VARIANTS(M) \
M(key8) \
M(key16) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)

enum class HashType
{
#define M(NAME) NAME,
APPLY_FOR_HASH_KEY_VARIANTS(M)
#undef M
};

template <typename Mapped>
using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;

/// Dummy key getter, always find nothing, used for JOIN ON NULL
template <typename Mapped>
class KeyGetterEmpty
{
public:
struct MappedType
{
using mapped_type = Mapped;
};

using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;

KeyGetterEmpty() = default;

FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
};

template <HashType type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key8, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key16, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key32, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key64, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_fixed_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys128, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys256, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::hashed, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false, true>;
};

template <HashType type, typename Data>
struct KeyGetterForType
{
using Value = typename Data::value_type;
using Mapped_t = typename Data::mapped_type;
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};

template <typename KeyGetter, typename Map, typename MappedHandler>
requires (std::is_invocable_v<MappedHandler, typename Map::mapped_type /*mapped*/, bool /*inserted*/, size_t /*row*/>)
void insertIntoHashMap(
Map & map, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler)
{
KeyGetter key_getter(key_columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
mapped_handler(emplace_result.getMapped(), emplace_result.isInserted(), i);
}
}

template <typename Map, typename MappedSerializer>
void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, WriteBuffer & wb)
Expand Down Expand Up @@ -83,25 +200,6 @@ void deserializeTwoLevelHashMap(Map & map, MappedDeserializer && mapped_deserial
/// HashMapsTemplate is a taken from HashJoin class and make it standalone
/// and could be shared among different components

/// Different types of keys for maps.
#define APPLY_FOR_HASH_KEY_VARIANTS(M) \
M(key8) \
M(key16) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)

enum class HashType
{
#define M(NAME) NAME,
APPLY_FOR_HASH_KEY_VARIANTS(M)
#undef M
};

template <size_t initial_size_degree = 8>
struct ConservativeHashTableGrowerWithPrecalculation : public HashTableGrowerWithPrecalculation<initial_size_degree>
{
Expand Down Expand Up @@ -139,6 +237,21 @@ struct HashMapsTemplate
type = which;
}

template <typename MappedHandler>
void insert(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler)
{
switch (type)
{
#define M(NAME) \
case HashType::NAME: \
using KeyGetter = typename KeyGetterForType<HashType::NAME, std::remove_reference_t<decltype(*NAME)>>::Type; \
insertIntoHashMap<KeyGetter>(*NAME, key_columns, key_sizes, rows, pool, std::move(mapped_handler)); \
break;
APPLY_FOR_HASH_KEY_VARIANTS(M)
#undef M
}
}

size_t getTotalRowCount() const
{
switch (type)
Expand Down Expand Up @@ -219,89 +332,4 @@ struct HashMapsTemplate
HashType type;
};

template <typename Mapped>
using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;

/// Dummy key getter, always find nothing, used for JOIN ON NULL
template <typename Mapped>
class KeyGetterEmpty
{
public:
struct MappedType
{
using mapped_type = Mapped;
};

using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;

KeyGetterEmpty() = default;

FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
};

template <HashType type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key8, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key16, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key32, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key64, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_fixed_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys128, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys256, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::hashed, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false, true>;
};

template <HashType type, typename Data>
struct KeyGetterForType
{
using Value = typename Data::value_type;
using Mapped_t = typename Data::mapped_type;
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};
}
17 changes: 0 additions & 17 deletions src/Core/BlockWithShard.h

This file was deleted.

23 changes: 23 additions & 0 deletions src/Core/DataBlockWithShard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <Core/Block.h>
#include <Core/LightChunk.h>

namespace DB
{
template <typename DataBlock>
struct DataBlockWithShard
{
DataBlock block;
int32_t shard;

DataBlockWithShard(DataBlock && block_, int32_t shard_) : block(std::move(block_)), shard(shard_) { }
};

using BlockWithShard = DataBlockWithShard<Block>;
using BlocksWithShard = std::vector<BlockWithShard>;

using LightChunkWithShard = DataBlockWithShard<LightChunk>;
using LightChunksWithShard = std::vector<LightChunkWithShard>;
}

21 changes: 19 additions & 2 deletions src/Core/LightChunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct LightChunk
void concat(const LightChunk & other)
{
auto added_rows = other.rows();
if (added_rows <= 0)
return;

assert(columns() == other.columns());
for (size_t c = 0; auto & col : data)
{
Expand All @@ -35,9 +38,21 @@ struct LightChunk
}
}

LightChunk cloneEmpty() const
{
LightChunk res;
res.data.reserve(data.size());

for (const auto & elem : data)
res.data.emplace_back(elem->cloneEmpty());

return res;
}

size_t rows() const noexcept { return data.empty() ? 0 : data[0]->size(); }
size_t columns() const noexcept { return data.size(); }

Columns & getColumns() noexcept { return data; }
const Columns & getColumns() const noexcept { return data; }
Columns detachColumns() noexcept { return std::move(data); }

Expand Down Expand Up @@ -88,7 +103,9 @@ struct LightChunkWithTimestamp
LightChunkWithTimestamp() = default;
LightChunkWithTimestamp(Columns && data_) : chunk(std::move(data_)) { }
LightChunkWithTimestamp(Chunk && chunk_, Int64 min_ts, Int64 max_ts)
: chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts) { }
: chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts)
{
}
LightChunkWithTimestamp(const Block & block)
: chunk(block), min_timestamp(block.minTimestamp()), max_timestamp(block.maxTimestamp()) { }

Expand Down Expand Up @@ -122,4 +139,4 @@ struct LightChunkWithTimestamp
Int64 maxTimestamp() const noexcept { return max_timestamp; }
};

}
}
2 changes: 1 addition & 1 deletion src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,7 @@ std::shared_ptr<IJoin> SelectQueryExpressionAnalyzer::chooseJoinAlgorithmStreami
return std::make_shared<Streaming::ConcurrentHashJoin>(
analyzed_join, max_threads, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
else
return std::make_shared<Streaming::HashJoin>(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
return Streaming::HashJoin::create(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
}
/// proton : ends

Expand Down
Loading

0 comments on commit 7ecf8db

Please sign in to comment.