Skip to content

Commit

Permalink
Add more metrics log (#492)
Browse files Browse the repository at this point in the history
* add more metrics log
* remove CachedBlockMetrics serialization
* tweak log aggregated metrics
  • Loading branch information
yl-lisen authored Jan 18, 2024
1 parent e5a0501 commit 1bc83df
Show file tree
Hide file tree
Showing 39 changed files with 750 additions and 486 deletions.
2 changes: 1 addition & 1 deletion src/Common/HashMapsTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct HashMapsTemplate
UNREACHABLE();
}

size_t getTotalByteCountImpl() const
size_t getBufferSizeInBytes() const
{
switch (type)
{
Expand Down
9 changes: 9 additions & 0 deletions src/Common/HashTable/StringHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ struct StringHashTableEmpty //-V730
size_t size() const { return hasZero() ? 1 : 0; }
bool empty() const { return !hasZero(); }
size_t getBufferSizeInBytes() const { return sizeof(Cell); }
size_t getBufferSizeInCells() const { return 1; }
size_t getCollisions() const { return 0; }
};

Expand Down Expand Up @@ -439,6 +440,14 @@ class StringHashTable : private boost::noncopyable
+ ms.getBufferSizeInBytes();
}

/// proton: starts.
size_t getBufferSizeInCells() const
{
return m0.getBufferSizeInCells() + m1.getBufferSizeInCells() + m2.getBufferSizeInCells() + m3.getBufferSizeInCells()
+ ms.getBufferSizeInCells();
}
/// proton: ends.

void clearAndShrink()
{
m1.clearHasZero();
Expand Down
12 changes: 11 additions & 1 deletion src/Common/HashTable/TimeBucketHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,17 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
size_t res = 0;
for (const auto & p : impls)
res += p.getBufferSizeInBytes();
res += p.second.getBufferSizeInBytes();

return res;
}

size_t getBufferSizeInCells() const
{
size_t res = 0;
for (const auto & p : impls)
res += p.second.getBufferSizeInCells();

return res;
}

Expand Down
11 changes: 10 additions & 1 deletion src/Common/HashTable/TwoLevelHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,16 @@ class TwoLevelHashTable :
return res;
}

/// proton : starts
/// proton: starts
size_t getBufferSizeInCells() const
{
size_t res = 0;
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
res += impls[i].getBufferSizeInCells();

return res;
}

std::vector<Int64> buckets() const
{
std::vector<Int64> bucket_ids(NUM_BUCKETS);
Expand Down
11 changes: 11 additions & 0 deletions src/Common/HashTable/TwoLevelStringHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ class TwoLevelStringHashTable : private boost::noncopyable
return res;
}

/// proton: starts
size_t getBufferSizeInCells() const
{
size_t res = 0;
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
res += impls[i].getBufferSizeInCells();

return res;
}
/// proton: ends

std::vector<Int64> buckets() const
{
std::vector<Int64> bucket_ids(NUM_BUCKETS);
Expand Down
17 changes: 17 additions & 0 deletions src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ size_t Block::bytes() const
}

size_t Block::allocatedBytes() const
{
return allocatedMetadataBytes() + allocatedDataBytes();
}

size_t Block::allocatedDataBytes() const
{
size_t res = 0;
for (const auto & elem : data)
Expand All @@ -397,6 +402,18 @@ size_t Block::allocatedBytes() const
return res;
}

size_t Block::allocatedMetadataBytes() const
{
size_t res = 0;
for (const auto & elem : data)
res += elem.column->allocatedMetadataBytes();

res += sizeof(data) + data.capacity() * sizeof(ColumnWithTypeAndName);
res += sizeof(index_by_name) + index_by_name.size() * sizeof(IndexByName::value_type);
res += sizeof(info);
return res;
}

std::string Block::dumpNames() const
{
WriteBufferFromOwnString out;
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class Block

/// Approximate number of allocated bytes in memory - for profiling and limits.
size_t allocatedBytes() const;
size_t allocatedDataBytes() const;
size_t allocatedMetadataBytes() const;

operator bool() const { return !!columns(); }
bool operator!() const { return !this->operator bool(); }
Expand Down
2 changes: 1 addition & 1 deletion src/Core/LightChunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct LightChunk
for (const auto & column : data)
res += column->allocatedMetadataBytes();

res += sizeof(data) + data.size() * sizeof(ColumnPtr);
res += sizeof(data) + data.capacity() * sizeof(ColumnPtr);
return res;
}

Expand Down
26 changes: 26 additions & 0 deletions src/Interpreters/Streaming/AggregatedDataMetrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

namespace DB::Streaming
{
struct AggregatedDataMetrics
{
size_t total_aggregated_rows = 0;
size_t total_bytes_in_arena = 0;
size_t hash_buffer_bytes = 0;
size_t hash_buffer_cells = 0;
size_t total_bytes_of_aggregate_states = 0;

String string() const
{
return fmt::format(
"total_aggregated_rows={} total_aggregated_bytes={} (total_bytes_in_arena: {}, hash_buffer_bytes: {}, hash_buffer_cells: {}, "
"total_bytes_of_aggregate_states: {})",
total_aggregated_rows,
total_bytes_in_arena + hash_buffer_bytes,
total_bytes_in_arena,
hash_buffer_bytes,
hash_buffer_cells,
total_bytes_of_aggregate_states);
}
};
}
32 changes: 32 additions & 0 deletions src/Interpreters/Streaming/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <Formats/SimpleNativeReader.h>
#include <Formats/SimpleNativeWriter.h>
#include <Interpreters/CompiledAggregateFunctionsHolder.h>
#include <Interpreters/Streaming/AggregatedDataMetrics.h>
#include <Common/HashMapsTemplate.h>
#include <Common/VersionRevision.h>
#include <Common/logger_useful.h>
Expand Down Expand Up @@ -4338,6 +4339,37 @@ bool Aggregator::checkAndProcessResult(AggregatedDataVariants & result, bool & n

return false;
}

void Aggregator::updateMetrics(const AggregatedDataVariants & variants, AggregatedDataMetrics & metrics) const
{
switch (variants.type)
{
case AggregatedDataVariants::Type::EMPTY: break;
case AggregatedDataVariants::Type::without_key:
{
metrics.total_aggregated_rows += 1;
metrics.total_bytes_of_aggregate_states += total_size_of_aggregate_states;
break;
}

#define M(NAME, IS_TWO_LEVEL) \
case AggregatedDataVariants::Type::NAME: \
{ \
auto & table = variants.NAME->data; \
metrics.total_aggregated_rows += table.size(); \
metrics.hash_buffer_bytes += table.getBufferSizeInBytes(); \
metrics.hash_buffer_cells += table.getBufferSizeInCells(); \
metrics.total_bytes_of_aggregate_states += table.size() * total_size_of_aggregate_states; \
break; \
}

APPLY_FOR_AGGREGATED_VARIANTS_STREAMING(M)
#undef M
}

for (const auto & arena : variants.aggregates_pools)
metrics.total_bytes_in_arena += arena->size();
}
/// proton: ends
}
}
4 changes: 3 additions & 1 deletion src/Interpreters/Streaming/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ using TimeBucketAggregatedDataWithKeys128TwoLevel = TimeBucketHashMap<UInt128, A
using TimeBucketAggregatedDataWithKeys256TwoLevel = TimeBucketHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;

class Aggregator;

struct AggregatedDataMetrics;
struct AggregatedDataVariants : private boost::noncopyable
{
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
Expand Down Expand Up @@ -857,6 +857,8 @@ class Aggregator final

/// proton: starts
Params & getParams() { return params; }

void updateMetrics(const AggregatedDataVariants & variants, AggregatedDataMetrics & metrics) const;
/// proton: ends

private:
Expand Down
19 changes: 11 additions & 8 deletions src/Interpreters/Streaming/CachedBlockMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ namespace DB
{
namespace Streaming
{
void CachedBlockMetrics::serialize(WriteBuffer & wb) const
void CachedBlockMetrics::serialize(WriteBuffer & wb, VersionType version) const
{
DB::writeBinary(current_total_blocks, wb);
DB::writeBinary(current_total_bytes, wb);
assert(version <= SERDE_REQUIRED_MAX_VERSION);
DB::writeBinary(total_blocks, wb);
DB::writeBinary(total_bytes, wb);
DB::writeBinary(total_data_bytes, wb);
DB::writeBinary(total_blocks, wb);
DB::writeBinary(total_data_bytes, wb);
DB::writeBinary(gced_blocks, wb);
}

void CachedBlockMetrics::deserialize(ReadBuffer & rb)
void CachedBlockMetrics::deserialize(ReadBuffer & rb, VersionType version)
{
DB::readBinary(current_total_blocks, rb);
DB::readBinary(current_total_bytes, rb);
assert(version <= SERDE_REQUIRED_MAX_VERSION);
/// V1 layout [current_total_blocks, current_total_bytes, total_blocks, total_bytes, gced_blocks]
DB::readBinary(total_blocks, rb);
DB::readBinary(total_data_bytes, rb);
DB::readBinary(total_blocks, rb);
DB::readBinary(total_bytes, rb);
DB::readBinary(total_data_bytes, rb);
DB::readBinary(gced_blocks, rb);
}
}
Expand Down
27 changes: 17 additions & 10 deletions src/Interpreters/Streaming/CachedBlockMetrics.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <base/defines.h>
#include <fmt/format.h>
#include <base/types.h>

namespace DB
{
Expand All @@ -11,25 +13,30 @@ namespace Streaming
{
struct CachedBlockMetrics
{
size_t current_total_blocks = 0;
size_t current_total_bytes = 0;
size_t total_rows = 0;
size_t total_blocks = 0;
size_t total_bytes = 0;
size_t total_metadata_bytes = 0;
size_t total_data_bytes = 0;
size_t gced_blocks = 0;

ALWAYS_INLINE size_t totalBytes() const { return total_metadata_bytes + total_data_bytes; }

std::string string() const
{
return fmt::format(
"total_bytes={} total_blocks={} current_total_bytes={} current_total_blocks={} gced_blocks={}",
total_bytes,
"total_rows={} total_bytes={} total_blocks={} gced_blocks={} (total_metadata_bytes:{} total_data_bytes:{})",
total_rows,
totalBytes(),
total_blocks,
current_total_bytes,
current_total_blocks,
gced_blocks);
gced_blocks,
total_metadata_bytes,
total_data_bytes);
}

void serialize(WriteBuffer & wb) const;
void deserialize(ReadBuffer & rb);
/// [Legacy] We don't need to serialize this anymore on new impl, since the metrics is volated. will update it back during recover
static constexpr VersionType SERDE_REQUIRED_MAX_VERSION = 1;
void serialize(WriteBuffer & wb, VersionType version) const;
void deserialize(ReadBuffer & rb, VersionType version);
};
}
}
8 changes: 4 additions & 4 deletions src/Interpreters/Streaming/ConcurrentHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ BlocksWithShard ConcurrentHashJoin::dispatchBlock(const std::vector<size_t> & ke
return result;
}

void ConcurrentHashJoin::serialize(WriteBuffer & wb) const
void ConcurrentHashJoin::serialize(WriteBuffer & wb, VersionType version) const
{
/// Only last join thread to do serialization
if (serialize_requested.fetch_add(1) + 1 == num_used_hash_joins)
Expand All @@ -466,7 +466,7 @@ void ConcurrentHashJoin::serialize(WriteBuffer & wb) const
for (const auto & hash_join : hash_joins)
{
std::lock_guard lock(hash_join->mutex);
hash_join->data->serialize(wb);
hash_join->data->serialize(wb, version);
}

serialize_requested.store(0, std::memory_order_relaxed);
Expand All @@ -485,7 +485,7 @@ void ConcurrentHashJoin::serialize(WriteBuffer & wb) const
}
}

void ConcurrentHashJoin::deserialize(ReadBuffer & rb)
void ConcurrentHashJoin::deserialize(ReadBuffer & rb, VersionType version)
{
bool is_serialized;
DB::readBoolText(is_serialized, rb);
Expand All @@ -505,7 +505,7 @@ void ConcurrentHashJoin::deserialize(ReadBuffer & rb)
for (auto & hash_join : hash_joins)
{
std::lock_guard lock(hash_join->mutex);
hash_join->data->deserialize(rb);
hash_join->data->deserialize(rb, version);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/Streaming/ConcurrentHashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ class ConcurrentHashJoin final : public IHashJoin
return hash_joins[0]->data->rightJoinStreamDescription();
}

void serialize(WriteBuffer &) const override;
void deserialize(ReadBuffer &) override;
void serialize(WriteBuffer &, VersionType) const override;
void deserialize(ReadBuffer &, VersionType) override;

void cancel() override;

Expand Down
Loading

0 comments on commit 1bc83df

Please sign in to comment.