Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Mar 28, 2024
1 parent d4b02cc commit 8562452
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 170 deletions.
199 changes: 114 additions & 85 deletions src/Common/HashMapsTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,105 @@ class WriteBuffer;
class ReadBuffer;


template <typename Mapped>
using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;

/// Dummy key getter, always find nothing, used for JOIN ON NULL
template <typename Mapped>
class KeyGetterEmpty
{
public:
struct MappedType
{
using mapped_type = Mapped;
};

using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;

KeyGetterEmpty() = default;

FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
};

template <HashType type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key8, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key16, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key32, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key64, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_fixed_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys128, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys256, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::hashed, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false, true>;
};

template <HashType type, typename Data>
struct KeyGetterForType
{
using Value = typename Data::value_type;
using Mapped_t = typename Data::mapped_type;
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};

template <typename KeyGetter, typename Map, typename MappedHandler>
requires (std::is_invocable_v<MappedHandler, typename Map::mapped_type /*mapped*/, bool /*inserted*/, size_t /*row*/>)
void insertIntoHashMap(
Map & map, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler)
{
KeyGetter key_getter(key_columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
mapped_handler(emplace_result.getMapped(), emplace_result.isInserted(), i);
}
}

template <typename Map, typename MappedSerializer>
void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, WriteBuffer & wb)
{
Expand Down Expand Up @@ -139,6 +238,21 @@ struct HashMapsTemplate
type = which;
}

template <typename MappedHandler>
void insert(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler)
{
switch (which)
{
#define M(NAME) \
case HashType::NAME: \
using KeyGetter = typename KeyGetterForType<HashType::NAME, std::remove_reference_t<decltype(*NAME)>>::Type; \
insertIntoHashMap<KeyGetter>(*NAME, key_columns, key_sizes, rows, pool, std::mode(mapped_handler)); \
break;
APPLY_FOR_HASH_KEY_VARIANTS(M)
#undef M
}
}

size_t getTotalRowCount() const
{
switch (type)
Expand Down Expand Up @@ -219,89 +333,4 @@ struct HashMapsTemplate
HashType type;
};

template <typename Mapped>
using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;

/// Dummy key getter, always find nothing, used for JOIN ON NULL
template <typename Mapped>
class KeyGetterEmpty
{
public:
struct MappedType
{
using mapped_type = Mapped;
};

using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;

KeyGetterEmpty() = default;

FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
};

template <HashType type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key8, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key16, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key32, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key64, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::key_fixed_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys128, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::keys256, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false, true>;
};

template <typename Value, typename Mapped>
struct KeyGetterForTypeImpl<HashType::hashed, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false, true>;
};

template <HashType type, typename Data>
struct KeyGetterForType
{
using Value = typename Data::value_type;
using Mapped_t = typename Data::mapped_type;
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};
}
80 changes: 33 additions & 47 deletions src/Interpreters/Streaming/AsofHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ AsofHashJoin::AsofHashJoin(
, asof_type(*table_join->getAsofType())
, asof_inequality(table_join->getAsofInequality())
{
/// last key is asof column, not use as a hash key
hash_key_sizes = key_sizes;
hash_key_sizes.back();
}

void AsofHashJoin::joinLeftBlock(Block & block)
Expand All @@ -39,61 +42,44 @@ void AsofHashJoin::insertRightBlock(Block block)
for (const auto & name : key_names)
key_columns.push_back(all_key_columns[name]);

/// We will insert to the map only keys, where all components are not NULL.
ConstNullMapPtr null_map{};
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);

/// If LEFT, RIGHT or FULL save blocks with nulls for NotJoinedBlocks
UInt8 save_nullmap = 0;
if (isRightOrFull(table_join->kind()) && null_map)
{
/// Save rows with NULL keys
for (size_t i = 0; !save_nullmap && i < null_map->size(); ++i)
save_nullmap |= (*null_map)[i];
}
auto asof_column = key_columns.back();
key_columns.pop_back();

/// Add `block_to_save` to target stream data
/// Note `block_to_save` may be empty for cases in which the query doesn't care other non-key columns.
/// For example, SELECT count() FROM stream_a JOIN stream_b ON i=ii;
auto start_row = buffered_hash_data->addOrConcatDataBlock(std::move(block_to_save));
auto rows = buffered_hash_data->lastDataBlock().rows();
auto rows = block_to_save.rows();
auto start_row = right_buffered_hash_data->blocks.pushBackOrConcat(std::move(block_to_save));
auto row_ref_handler = [&](AsofRowRef & row_ref, bool inserted, size_t original_row, size_t row) {
AsofRowRef * row_ref_ptr = &row_ref;
if (inserted)
row_ref_ptr = new (row_ref_ptr) AsofRowRef(asof_type);

switch (hash_method_type)
{
#define M(TYPE) \
case HashType::TYPE: \
return insertFromBlockImplType< \
Strictness::Asof, \
typename KeyGetterForType<HashType::TYPE, std::remove_reference_t<decltype(*(buffered_hash_data->maps->TYPE))>>::Type>( \
join, \
*(buffered_hash_data->maps->TYPE), \
rows, \
key_columns, \
key_sizes[0], \
&buffered_hash_data->blocks, \
start_row, \
null_map, \
buffered_hash_data->pool); \
break;
APPLY_FOR_HASH_KEY_VARIANTS(M)
#undef M
}
insertFromBlockImpl<strictness_>(
hash_method_type,
map,
rows,
key_columns,
key_sizes[0],
&target_hash_blocks->blocks,
start_row,
null_map,
target_hash_blocks->pool);
row_ref_ptr->insert(
asof_type,
asof_column,
&(right_buffered_hash_data->blocks),
original_row,
row,
asof_inequality,
rightJoinStreamDescription()->keep_versions);
};

if (save_nullmap)
/// FIXME, we will need account the allocated bytes for null_map_holder / not_joined_map as well
buffered_hash_data->blocks_nullmaps.emplace_back(&buffered_hash_data->lastDataBlock(), null_map_holder);
right_buffered_hash_data->map.insert(std::move(block_to_save), key_columns, hash_key_sizes, rows, right_buffered_hash_data->pool, std::move(row_ref_handler));

checkLimits();
}

void AsofHashJoin::checkLimits() const
{
auto current_total_bytes = right_buffered_hash_data->totalBufferedBytes();
if (current_total_bytes >= join_max_cached_bytes)
throw Exception(
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED,
"Streaming asof join's memory reaches max size: {}, current total: {}, right: {}",
join_max_cached_bytes,
current_total_bytes,
buffered_hash_data->getMetricsString());
}
}
}
2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/AsofHashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class AsofHashJoin final : public HashJoin

using DataBlock = LightChunkWithTimestamp;
using BufferedAsofHashData = BufferedHashData<DataBlock, AsofRowRefs<DataBlock>>;
SERDE std::unique_ptr<BufferedAsofHashData> buffered_hash_data;
SERDE std::unique_ptr<BufferedAsofHashData> right_buffered_hash_data;
};

}
Expand Down
Loading

0 comments on commit 8562452

Please sign in to comment.