Skip to content

Commit

Permalink
add metrics log for join (#3988) (#630)
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen authored Mar 27, 2024
1 parent d559d28 commit e2d9f19
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 10 deletions.
13 changes: 13 additions & 0 deletions src/Interpreters/Streaming/ConcurrentHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <Columns/ColumnSparse.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
#include <Common/WeakHash.h>

Expand Down Expand Up @@ -389,6 +391,17 @@ std::shared_ptr<NotJoinedBlocks> ConcurrentHashJoin::getNonJoinedBlocks(
ErrorCodes::LOGICAL_ERROR, "Invalid join type. join kind: {}, strictness: {}", table_join->kind(), table_join->strictness());
}

String ConcurrentHashJoin::metricsString() const
{
WriteBufferFromOwnString wb;
for (size_t i = 0; const auto & hash_join : hash_joins)
{
std::lock_guard lock(hash_join->mutex);
wb << "HashJoin-" << i++ << "(" << hash_join->data->metricsString() << ")";
}
return wb.str();
}

static ALWAYS_INLINE IColumn::Selector hashToSelector(const WeakHash32 & hash, size_t num_shards)
{
assert(num_shards > 0 && (num_shards & (num_shards - 1)) == 0);
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Streaming/ConcurrentHashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ConcurrentHashJoin final : public IHashJoin
bool supportParallelJoin() const override { return true; }
std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
String metricsString() const override;

void getKeyColumnPositions(
std::vector<size_t> & left_key_column_positions_,
Expand Down
20 changes: 12 additions & 8 deletions src/Interpreters/Streaming/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,14 +866,7 @@ HashJoin::HashJoin(

HashJoin::~HashJoin() noexcept
{
LOG_INFO(
logger,
"Left stream metrics: {{{}}}, right stream metrics: {{{}}}, global join metrics: {{{}}}, "
"retract buffer metrics: {{{}}}",
left_data.buffered_data->joinMetricsString(),
right_data.buffered_data->joinMetricsString(),
join_metrics.string(),
join_results ? join_results->joinMetricsString(this) : "");
LOG_INFO(logger, "{}", metricsString());
}

void HashJoin::init()
Expand Down Expand Up @@ -1713,6 +1706,17 @@ struct AdderNonJoined
}
};

String HashJoin::metricsString() const
{
return fmt::format(
"Left stream metrics: {{{}}}, right stream metrics: {{{}}}, global join metrics: {{{}}}, "
"retract buffer metrics: {{{}}}",
left_data.buffered_data->joinMetricsString(),
right_data.buffered_data->joinMetricsString(),
join_metrics.string(),
join_results ? join_results->joinMetricsString(this) : "");
}

std::shared_ptr<NotJoinedBlocks>
HashJoin::getNonJoinedBlocks(const Block & /*left_sample_block*/, const Block & /*result_sample_block*/, UInt64 /*max_block_size*/) const
{
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Streaming/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class HashJoin final : public IHashJoin
std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;

String metricsString() const final;

/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const final;
/// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools.
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Streaming/IHashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IHashJoin : public IJoin
bool include_asof_key_column = false) const
= 0;

virtual String metricsString() const { return ""; }

/// Whether hash join algorithm has buffer left/right data to align
virtual bool leftStreamRequiresBufferingDataToAlign() const = 0;
virtual bool rightStreamRequiresBufferingDataToAlign() const = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Transforms/JoiningTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ void JoiningTransform::transform(Chunk & chunk)
{
initialized = true;

/// proton: starts.
LOG_INFO(&Poco::Logger::get("JoiningTransform"), "built hash map, total joined data {} rows, {} bytes", join->getTotalRowCount(), join->getTotalByteCount());
/// proton: ends.

if (join->alwaysReturnsEmptySet() && !on_totals)
{
stop_reading = true;
Expand Down
7 changes: 7 additions & 0 deletions src/Processors/Transforms/Streaming/JoinTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ JoinTransform::JoinTransform(
, output_header_chunk(outputs.front().getHeader().getColumns(), 0)
, logger(&Poco::Logger::get("StreamingJoinTransform"))
, input_ports_with_data{InputPortWithData{&inputs.front()}, InputPortWithData{&inputs.back()}}
, last_log_ts(MonotonicSeconds::now())
{
assert(join);

Expand Down Expand Up @@ -203,6 +204,12 @@ void JoinTransform::work()
assert(!output_chunks.empty());
output_chunks.back().setCheckpointContext(std::move(requested_ckpt));
}

if (MonotonicSeconds::now() - last_log_ts > 60)
{
LOG_INFO(logger, "{}, watermark={}", join->metricsString(), watermark);
last_log_ts = MonotonicSeconds::now();
}
}

inline void JoinTransform::propagateWatermark(int64_t local_watermark)
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Transforms/Streaming/JoinTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class JoinTransform final : public IProcessor
NO_SERDE std::list<Chunk> output_chunks;

SERDE int64_t watermark = INVALID_WATERMARK;

NO_SERDE Int64 last_log_ts = 0;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ void JoinTransformWithAlignment::work()

need_propagate_heartbeat = false;

if (DB::MonotonicSeconds::now() - last_stats_log_ts >= 5)
if (DB::MonotonicSeconds::now() - last_stats_log_ts >= 60)
{
LOG_INFO(
log,
"left_watermark={} right_watermark={} left_input_muted={} right_input_muted={} left_quiesce_joins={} right_quiesce_joins={}",
"{}, left_watermark={} right_watermark={} left_input_muted={} right_input_muted={} left_quiesce_joins={} right_quiesce_joins={}",
join->metricsString(),
left_input.watermark,
right_input.watermark,
stats.left_input_muted,
Expand Down

0 comments on commit e2d9f19

Please sign in to comment.