Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,42 @@ void registerVeloxMetrics() {
// The number of data size exchange requests.
DEFINE_METRIC(kMetricExchangeDataSizeCount, facebook::velox::StatType::COUNT);

/// ================== Index Lookup Counters =================
// The distribution of index lookup result raw bytes in range of [0, 128MB]
// with 128 buckets. It is configured to report the capacity at P50, P90, P99,
// and P100 percentiles.
DEFINE_HISTOGRAM_METRIC(
kMetricIndexLookupResultRawBytes,
1L << 20,
0,
128L << 20,
50,
90,
99,
100);

// The distribution of index lookup result bytes in range of [0, 128MB] with
// 128 buckets. It is configured to report the capacity at P50, P90, P99, and
// P100 percentiles.
DEFINE_HISTOGRAM_METRIC(
kMetricIndexLookupResultBytes, 1L << 20, 0, 128L << 20, 50, 90, 99, 100);

// The time distribution of index lookup time in range of [0, 16s] with 512
// buckets and reports P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
kMetricIndexLookupTimeMs, 32, 0, 16L << 10, 50, 90, 99, 100);

// The time distribution of index lookup wait time in range of [0, 16s] with
// 512 buckets and reports P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
kMetricIndexLookupWaitTimeMs, 32, 0, 16L << 10, 50, 90, 99, 100);

/// ================== Table Scan Counters =================
// The time distribution of table scan batch processing time in range of [0,
// 16s] with 512 buckets and reports P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
kMetricTableScanBatchProcessTimeMs, 32, 0, 16L << 10, 50, 90, 99, 100);

/// ================== Storage Counters =================

// The time distribution of storage IO throttled duration in range of [0, 30s]
Expand Down
15 changes: 15 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,19 @@ constexpr folly::StringPiece kMetricStorageGlobalThrottled{

constexpr folly::StringPiece kMetricStorageNetworkThrottled{
"velox.storage_network_throttled_count"};

constexpr folly::StringPiece kMetricIndexLookupResultRawBytes{
"velox.index_lookup_result_raw_bytes"};

constexpr folly::StringPiece kMetricIndexLookupResultBytes{
"velox.index_lookup_result_bytes"};

constexpr folly::StringPiece kMetricIndexLookupTimeMs{
"velox.index_lookup_time_ms"};

constexpr folly::StringPiece kMetricIndexLookupWaitTimeMs{
"velox.index_lookup_wait_time_ms"};

constexpr folly::StringPiece kMetricTableScanBatchProcessTimeMs{
"velox.table_scan_batch_process_time_ms"};
} // namespace facebook::velox
25 changes: 17 additions & 8 deletions velox/common/base/RuntimeMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,44 +53,53 @@ void RuntimeMetric::printMetric(std::stringstream& stream) const {
case RuntimeCounter::Unit::kNanos:
stream << " sum: " << succinctNanos(sum) << ", count: " << count
<< ", min: " << succinctNanos(min)
<< ", max: " << succinctNanos(max);
<< ", max: " << succinctNanos(max)
<< ", avg: " << succinctNanos(count == 0 ? 0 : sum / count);
break;
case RuntimeCounter::Unit::kBytes:
stream << " sum: " << succinctBytes(sum) << ", count: " << count
<< ", min: " << succinctBytes(min)
<< ", max: " << succinctBytes(max);
<< ", max: " << succinctBytes(max)
<< ", avg: " << succinctBytes(count == 0 ? 0 : sum / count);
break;
case RuntimeCounter::Unit::kNone:
[[fallthrough]];
default:
stream << " sum: " << sum << ", count: " << count << ", min: " << min
<< ", max: " << max;
<< ", max: " << max << ", avg: " << (count == 0 ? 0 : sum / count);
}
}

std::string RuntimeMetric::toString() const {
switch (unit) {
case RuntimeCounter::Unit::kNanos:
return fmt::format(
"sum:{}, count:{}, min:{}, max:{}",
"sum:{}, count:{}, min:{}, max:{}, avg: {}",
succinctNanos(sum),
count,
succinctNanos(min),
succinctNanos(max));
succinctNanos(max),
succinctNanos(count == 0 ? 0 : sum / count));
break;
case RuntimeCounter::Unit::kBytes:
return fmt::format(
"sum:{}, count:{}, min:{}, max:{}",
"sum:{}, count:{}, min:{}, max:{}, avg: {}",
succinctBytes(sum),
count,
succinctBytes(min),
succinctBytes(max));
succinctBytes(max),
succinctBytes(count == 0 ? 0 : sum / count));
break;
case RuntimeCounter::Unit::kNone:
[[fallthrough]];
default:
return fmt::format(
"sum:{}, count:{}, min:{}, max:{}", sum, count, min, max);
"sum:{}, count:{}, min:{}, max:{}, avg: {}",
sum,
count,
min,
max,
count == 0 ? 0 : sum / count);
}
}

Expand Down
11 changes: 7 additions & 4 deletions velox/common/base/tests/RuntimeMetricsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ TEST_F(RuntimeMetricsTest, basic) {

ASSERT_EQ(
fmt::format(
"sum:{}, count:{}, min:{}, max:{}",
"sum:{}, count:{}, min:{}, max:{}, avg: {}",
rm1.sum,
rm1.count,
rm1.min,
rm1.max),
rm1.max,
rm1.sum / rm1.count),
rm1.toString());

RuntimeMetric rm2;
Expand All @@ -74,11 +75,13 @@ TEST_F(RuntimeMetricsTest, basic) {

RuntimeMetric byteRm(RuntimeCounter::Unit::kBytes);
byteRm.addValue(5);
ASSERT_EQ(byteRm.toString(), "sum:5B, count:1, min:5B, max:5B");
ASSERT_EQ(byteRm.toString(), "sum:5B, count:1, min:5B, max:5B, avg: 5B");

RuntimeMetric timeRm(RuntimeCounter::Unit::kNanos);
timeRm.addValue(2'000);
ASSERT_EQ(timeRm.toString(), "sum:2.00us, count:1, min:2.00us, max:2.00us");
ASSERT_EQ(
timeRm.toString(),
"sum:2.00us, count:1, min:2.00us, max:2.00us, avg: 2.00us");
}

} // namespace facebook::velox
44 changes: 44 additions & 0 deletions velox/docs/monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,47 @@ Hive Connector
- The distribution of hive sort writer finish processing time slice in range
of[0, 120s] with 60 buckets. It is configured to report latency at P50,
P90, P99, and P100 percentiles.

Index Join
----------

.. list-table::
:widths: 40 10 50
:header-rows: 1

* - Metric Name
- Type
- Description
* - index_lookup_wait_time_ms
- Histogram
- The time distribution of index lookup time in range of [0, 16s] with 512
buckets and reports P50, P90, P99, and P100.
* - index_lookup_wait_time_ms
- Histogram
- The time distribution of index lookup time in range of [0, 16s] with 512
buckets and reports P50, P90, P99, and P100.
* - index_lookup_result_raw_bytes
- Histogram
- The distribution of index lookup result raw bytes in range of [0, 128MB]
with 128 buckets. It is configured to report the capacity at P50, P90, P99,
and P100 percentiles.
* - index_lookup_result_bytes
- Histogram
- The distribution of index lookup result bytes in range of [0, 128MB] with
128 buckets. It is configured to report the capacity at P50, P90, P99, and
P100 percentiles.

Table Scan
----------

.. list-table::
:widths: 40 10 50
:header-rows: 1

* - Metric Name
- Type
- Description
* - table_scan_batch_process_time_ms
- Histogram
- The time distribution of table scan batch processing time in range of [0,
16s] with 512 buckets and reports P50, P90, P99, and P100.
28 changes: 24 additions & 4 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,32 @@ These stats are reported only by IndexLookupJoin operator
* - Stats
- Unit
- Description
* - lookupWallNanos
* - connectorlookupWallNanos
- nanos
- The walltime in nanoseconds that the index connector do the lookup.
* - lookupCpuNanos
- The end-to-end walltime in nanoseconds that the index connector do the lookup.
* - connectorlookupWaitWallNanos
- nanos
- The cpu time in nanoseconds that the index connector do the lookup.
- The walltime in nanoseconds that the index connector wait for the lookup from
remote storage.
* - connectorResultPrepareCpuNanos
- nanos
- The cpu time in nanoseconds that the index connector process response from storages
client for followup processing by index join operator.
* - clientRequestProcessCpuNanos
- nanos
- The cpu time in nanoseconds that the storage client process request for remote
storage lookup such as encoding the lookup input data into remotr storage request.
* - clientResultProcessCpuNanos
- nanos
- The cpu time in nanoseconds that the storage client process response from remote
storage lookup such as decoding the response data into velox vectors.
* - clientLookupResultRawSize
- bytes
- The byte size of the raw result received from the remote storage lookup.
* - clientLookupResultSize
- bytes
- The byte size of the result data in velox vectors that are decoded from the raw data
received from the remote storage lookup.

Spilling
--------
Expand Down
13 changes: 7 additions & 6 deletions velox/exec/IndexLookupJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,14 +779,15 @@ void IndexLookupJoin::recordConnectorStats() {
lockedStats->runtimeStats.erase(name);
lockedStats->runtimeStats.emplace(name, std::move(value));
}
if (connectorStats.count(kConnectorLookupCpuTime) != 0) {
VELOX_CHECK_EQ(
connectorStats[kConnectorLookupCpuTime].count,
connectorStats[kConnectorLookupWallTime].count);
if (connectorStats.count(kConnectorLookupWallTime) != 0) {
const CpuWallTiming backgroundTiming{
static_cast<uint64_t>(connectorStats[kConnectorLookupCpuTime].count),
static_cast<uint64_t>(connectorStats[kConnectorLookupWallTime].count),
static_cast<uint64_t>(connectorStats[kConnectorLookupWallTime].sum),
static_cast<uint64_t>(connectorStats[kConnectorLookupCpuTime].sum)};
// NOTE: this might not be accurate as it doesn't include the time spent
// inside the index storage client.
static_cast<uint64_t>(connectorStats[kConnectorResultPrepareTime].sum) +
connectorStats[kClientRequestProcessTime].sum +
connectorStats[kClientResultProcessTime].sum};
lockedStats->backgroundTiming.clear();
lockedStats->backgroundTiming.add(backgroundTiming);
}
Expand Down
33 changes: 29 additions & 4 deletions velox/exec/IndexLookupJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,35 @@ class IndexLookupJoin : public Operator {
void close() override;

/// Defines lookup runtime stats.
/// The walltime time that the index connector do the lookup.
static inline const std::string kConnectorLookupWallTime{"lookupWallNanos"};
/// The cpu time that the index connector do the lookup.
static inline const std::string kConnectorLookupCpuTime{"lookupCpuNanos"};
/// The end-to-end walltime in nanoseconds that the index connector do the
/// lookup.
static inline const std::string kConnectorLookupWallTime{
"connectorLookupWallNanos"};
/// The cpu time in nanoseconds that the index connector process response from
/// storage client for followup processing by index join operator.
static inline const std::string kConnectorResultPrepareTime{
"connectorResultPrepareCpuNanos"};
/// The cpu time in nanoseconds that the storage client process request for
/// remote storage lookup such as encoding the lookup input data into remotr
/// storage request.
static inline const std::string kClientRequestProcessTime{
"clientRequestProcessCpuNanos"};
/// The walltime in nanoseconds that the storage client wait for the lookup
/// from remote storage.
static inline const std::string kClientLookupWaitWallTime{
"clientlookupWaitWallNanos"};
/// The cpu time in nanoseconds that the storage client process response from
/// remote storage lookup such as decoding the response data into velox
/// vectors.
static inline const std::string kClientResultProcessTime{
"clientResultProcessCpuNanos"};
/// The byte size of the raw result received from the remote storage lookup.
static inline const std::string kClientLookupResultRawSize{
"clientLookupResultRawSize"};
/// The byte size of the result data in velox vectors that are decoded from
/// the raw data received from the remote storage lookup.
static inline const std::string kClientLookupResultSize{
"clientLookupResultSize"};

private:
using LookupResultIter = connector::IndexSource::LookupResultIterator;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ RowVectorPtr TableScan::getOutput() {
{maxFilteringRatio_,
1.0 * data->size() / readBatchSize,
1.0 / kMaxSelectiveBatchSizeMultiplier});
if (ioTimeUs > 0) {
RECORD_HISTOGRAM_METRIC_VALUE(
velox::kMetricTableScanBatchProcessTimeMs, ioTimeUs / 1'000);
}
return data;
}
continue;
Expand Down
15 changes: 12 additions & 3 deletions velox/exec/tests/IndexLookupJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1817,13 +1817,22 @@ DEBUG_ONLY_TEST_P(IndexLookupJoinTest, runtimeStats) {
numProbeBatches);
ASSERT_GT(runtimeStats.at(IndexLookupJoin::kConnectorLookupWallTime).sum, 0);
ASSERT_EQ(
runtimeStats.at(IndexLookupJoin::kConnectorLookupCpuTime).count,
runtimeStats.at(IndexLookupJoin::kClientLookupWaitWallTime).count,
numProbeBatches);
ASSERT_GT(runtimeStats.at(IndexLookupJoin::kConnectorLookupCpuTime).sum, 0);
ASSERT_GT(runtimeStats.at(IndexLookupJoin::kClientLookupWaitWallTime).sum, 0);
ASSERT_EQ(
runtimeStats.at(IndexLookupJoin::kConnectorResultPrepareTime).count,
numProbeBatches);
ASSERT_GT(
runtimeStats.at(IndexLookupJoin::kConnectorResultPrepareTime).sum, 0);
ASSERT_EQ(runtimeStats.count(IndexLookupJoin::kClientRequestProcessTime), 0);
ASSERT_EQ(runtimeStats.count(IndexLookupJoin::kClientResultProcessTime), 0);
ASSERT_EQ(runtimeStats.count(IndexLookupJoin::kClientLookupResultSize), 0);
ASSERT_EQ(runtimeStats.count(IndexLookupJoin::kClientLookupResultRawSize), 0);
ASSERT_THAT(
operatorStats.toString(true, true),
testing::MatchesRegex(
".*Runtime stats.*lookupWallNanos.*lookupCpuNanos.*"));
".*Runtime stats.*connectorLookupWallNanos:.*clientlookupWaitWallNanos.*connectorResultPrepareCpuNanos.*"));
}

TEST_P(IndexLookupJoinTest, joinFuzzer) {
Expand Down
Loading
Loading