Skip to content

Commit

Permalink
Add Time-Specific Metrics (#867)
Browse files Browse the repository at this point in the history
* Added Time-Specific Metrics

Original PR: #819

Co-authored-by: guljain <[email protected]>
  • Loading branch information
arunkumarchacko and guljain committed Sep 22, 2022
1 parent d16e109 commit a820950
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.INVOCATION_HFLUSH;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.INVOCATION_HSYNC;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_BYTES;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_CLOSE_OPERATIONS;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_EXCEPTIONS;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_OPERATIONS;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
Expand Down Expand Up @@ -66,6 +68,7 @@
*/
public class GhfsInstrumentation
implements Closeable, MetricsSource, IOStatisticsSource, DurationTrackerFactory {

private static final String METRICS_SOURCE_BASENAME = "GCSMetrics";

/**
Expand Down Expand Up @@ -137,7 +140,7 @@ public GhfsInstrumentation(URI name) {
// duration track metrics (Success/failure) and IOStatistics.
durationTrackerFactory =
IOStatisticsBinding.pairedTrackerFactory(
instanceIOStatistics, new MetricDurationTrackerFactory());
new MetricDurationTrackerFactory(), instanceIOStatistics);
}

/**
Expand Down Expand Up @@ -324,6 +327,7 @@ private void incrementMutableCounter(String name, long count) {
* the count on start; after a failure the failures count is incremented by one.
*/
private final class MetricUpdatingDurationTracker implements DurationTracker {

/** Name of the statistics value to be updated */
private final String symbol;

Expand Down Expand Up @@ -485,17 +489,18 @@ private InputStreamStatistics(@Nullable FileSystem.Statistics filesystemStatisti
IOStatisticsStore st =
iostatisticsStore()
.withCounters(
StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
StreamStatisticNames.STREAM_READ_BYTES,
StreamStatisticNames.STREAM_READ_EXCEPTIONS,
StreamStatisticNames.STREAM_READ_OPERATIONS,
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE,
StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS,
StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS,
StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS,
StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS,
StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
StreamStatisticNames.STREAM_READ_TOTAL_BYTES)
.withDurationTracking(
GhfsStatistic.STREAM_READ_SEEK_OPERATIONS.getSymbol(),
GhfsStatistic.STREAM_READ_CLOSE_OPERATIONS.getSymbol(),
GhfsStatistic.STREAM_READ_OPERATIONS.getSymbol())
.build();
setIOStatistics(st);
backwardSeekOperations =
Expand Down Expand Up @@ -555,7 +560,6 @@ private IOStatisticsStore localIOStatistics() {
*/
@Override
public void seekBackwards(long negativeOffset) {
seekOperations.incrementAndGet();
backwardSeekOperations.incrementAndGet();
bytesBackwardsOnSeek.addAndGet(-negativeOffset);
}
Expand All @@ -569,7 +573,6 @@ public void seekForwards(long skipped) {
if (skipped > 0) {
bytesSkippedOnSeek.addAndGet(skipped);
}
seekOperations.incrementAndGet();
forwardSeekOperations.incrementAndGet();
}

Expand Down Expand Up @@ -617,8 +620,6 @@ public void readOperationCompleted(int requested, int actual) {
*/
@Override
public void close() {
increment(StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS);

IOStatisticsStore ioStatistics = localIOStatistics();
promoteInputStreamCountersToMetrics();
mergedStats = snapshotIOStatistics(localIOStatistics());
Expand Down Expand Up @@ -771,6 +772,7 @@ void promoteIOCounter(String name) {
*/
private final class OutputStreamStatistics extends AbstractGhfsStatisticsSource
implements GhfsOutputStreamStatistics {

private final AtomicLong bytesWritten;
private final AtomicLong writeExceptions;
private final FileSystem.Statistics filesystemStatistics;
Expand All @@ -784,9 +786,10 @@ private OutputStreamStatistics(@Nullable FileSystem.Statistics filesystemStatist
this.filesystemStatistics = filesystemStatistics;
IOStatisticsStore st =
iostatisticsStore()
.withCounters(
STREAM_WRITE_BYTES.getSymbol(),
STREAM_WRITE_EXCEPTIONS.getSymbol(),
.withCounters(STREAM_WRITE_BYTES.getSymbol(), STREAM_WRITE_EXCEPTIONS.getSymbol())
.withDurationTracking(
STREAM_WRITE_CLOSE_OPERATIONS.getSymbol(),
STREAM_WRITE_OPERATIONS.getSymbol(),
INVOCATION_HFLUSH.getSymbol(),
INVOCATION_HSYNC.getSymbol())
.build();
Expand Down Expand Up @@ -957,6 +960,7 @@ private IOStatisticsStoreBuilder createStoreBuilder() {
storeBuilder.withDurationTracking(stat.getSymbol());
}
});

return storeBuilder;
}
}
29 changes: 14 additions & 15 deletions gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsStatistic.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ public enum GhfsStatistic {
TYPE_COUNTER),
INVOCATION_COPY_FROM_LOCAL_FILE(
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE, "Calls of copyFromLocalFile()", TYPE_COUNTER),
INVOCATION_CREATE(StoreStatisticNames.OP_CREATE, "Calls of create()", TYPE_COUNTER),
INVOCATION_CREATE(StoreStatisticNames.OP_CREATE, "Calls of create()", TYPE_DURATION),
INVOCATION_CREATE_NON_RECURSIVE(
StoreStatisticNames.OP_CREATE_NON_RECURSIVE, "Calls of createNonRecursive()", TYPE_COUNTER),
INVOCATION_DELETE(StoreStatisticNames.OP_DELETE, "Calls of delete()", TYPE_COUNTER),
StoreStatisticNames.OP_CREATE_NON_RECURSIVE, "Calls of createNonRecursive()", TYPE_DURATION),
INVOCATION_DELETE(StoreStatisticNames.OP_DELETE, "Calls of delete()", TYPE_DURATION),
INVOCATION_EXISTS(StoreStatisticNames.OP_EXISTS, "Calls of exists()", TYPE_COUNTER),
INVOCATION_GET_DELEGATION_TOKEN(
StoreStatisticNames.OP_GET_DELEGATION_TOKEN, "Calls of getDelegationToken()", TYPE_COUNTER),
Expand All @@ -94,13 +94,13 @@ public enum GhfsStatistic {
INVOCATION_GET_FILE_STATUS(
StoreStatisticNames.OP_GET_FILE_STATUS, "Calls of getFileStatus()", TYPE_COUNTER),
INVOCATION_GLOB_STATUS(StoreStatisticNames.OP_GLOB_STATUS, "Calls of globStatus()", TYPE_COUNTER),
INVOCATION_HFLUSH(StoreStatisticNames.OP_HFLUSH, "Calls of hflush()", TYPE_COUNTER),
INVOCATION_HSYNC(StoreStatisticNames.OP_HSYNC, "Calls of hsync()", TYPE_COUNTER),
INVOCATION_HFLUSH(StoreStatisticNames.OP_HFLUSH, "Calls of hflush()", TYPE_DURATION),
INVOCATION_HSYNC(StoreStatisticNames.OP_HSYNC, "Calls of hsync()", TYPE_DURATION),
INVOCATION_LIST_FILES(StoreStatisticNames.OP_LIST_FILES, "Calls of listFiles()", TYPE_COUNTER),
INVOCATION_LIST_STATUS(StoreStatisticNames.OP_LIST_STATUS, "Calls of listStatus()", TYPE_COUNTER),
INVOCATION_MKDIRS(StoreStatisticNames.OP_MKDIRS, "Calls of mkdirs()", TYPE_COUNTER),
INVOCATION_OPEN(StoreStatisticNames.OP_OPEN, "Calls of open()", TYPE_COUNTER),
INVOCATION_RENAME(StoreStatisticNames.OP_RENAME, "Calls of rename()", TYPE_COUNTER),
INVOCATION_OPEN(StoreStatisticNames.OP_OPEN, "Calls of open()", TYPE_DURATION),
INVOCATION_RENAME(StoreStatisticNames.OP_RENAME, "Calls of rename()", TYPE_DURATION),
INVOCATION_LIST_LOCATED_STATUS(
StoreStatisticNames.OP_LIST_LOCATED_STATUS, "Calls of listLocatedStatus()", TYPE_COUNTER),

Expand All @@ -111,16 +111,14 @@ public enum GhfsStatistic {
TYPE_COUNTER),
STREAM_READ_CLOSE_OPERATIONS(
StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
"Total count of times an attempt to close an input stream was made",
TYPE_COUNTER),
"Calls of read stream close()",
TYPE_DURATION),
STREAM_READ_EXCEPTIONS(
StreamStatisticNames.STREAM_READ_EXCEPTIONS,
"Count of exceptions raised during input stream reads",
TYPE_COUNTER),
STREAM_READ_OPERATIONS(
StreamStatisticNames.STREAM_READ_OPERATIONS,
"Count of read() operations in an input stream",
TYPE_COUNTER),
StreamStatisticNames.STREAM_READ_OPERATIONS, "Calls of read()", TYPE_DURATION),
STREAM_READ_OPERATIONS_INCOMPLETE(
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE,
"Count of incomplete read() operations in an input stream",
Expand All @@ -142,9 +140,7 @@ public enum GhfsStatistic {
"Count of executed seek operations which went forward in" + " an input stream",
TYPE_COUNTER),
STREAM_READ_SEEK_OPERATIONS(
StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS,
"Count of seek operations in an input stream",
TYPE_COUNTER),
StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS, "Calls of seek()", TYPE_DURATION),
STREAM_READ_TOTAL_BYTES(
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
"Total count of bytes read from an input stream",
Expand All @@ -159,6 +155,9 @@ public enum GhfsStatistic {
StreamStatisticNames.STREAM_WRITE_BYTES,
"Count of bytes written to output stream" + " (including all not yet uploaded)",
TYPE_COUNTER),
STREAM_WRITE_CLOSE_OPERATIONS(
"stream_write_close_operations", "Calls of write stream close()", TYPE_DURATION),
STREAM_WRITE_OPERATIONS("stream_write_operations", "Calls of write()", TYPE_DURATION),

/** The XAttr API statistics */
INVOCATION_XATTR_GET_MAP(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,88 @@

package com.google.cloud.hadoop.fs.gcs;

import java.util.Map;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.impl.StorageStatisticsFromIOStatistics;

/** Storage statistics for GCS, dynamically generated from the IOStatistics. */
public class GhfsStorageStatistics extends StorageStatisticsFromIOStatistics {

/** {@value} The key that stores all the registered metrics */
public static final String NAME = "GhfsStorageStatistics";

/** Exention for minimum */
private static final String MINIMUM = StoreStatisticNames.SUFFIX_MIN;
/** Exention for maximum */
private static final String MAXIMUM = StoreStatisticNames.SUFFIX_MAX;
/** Exention for mean */
private static final String MEAN = StoreStatisticNames.SUFFIX_MEAN;

/** IOStatistics Instance */
private final IOStatistics ioStatistics;

/** Create the Storage Statistics instance from the IOStatistics */
public GhfsStorageStatistics(IOStatistics ioStatistics) {
super(NAME, "Ghfs", ioStatistics);
this.ioStatistics = ioStatistics;
}

/**
* To get the minimum value which is stored with MINIMUM extension
*
* @param symbol
* @return minimum statistic value
*/
public Long getMin(String symbol) {
return (Long) minimums().get(symbol + MINIMUM);
}

/**
* To get the maximum value which is stored with MAXIMUM extension
*
* @param symbol
* @return maximum statistic value
*/
public Long getMax(String symbol) {
return (Long) maximums().get(symbol + MAXIMUM);
}

/**
* To get the mean value which is stored with MEAN extension
*
* @param symbol
* @return mean statistic value
*/
public double getMean(String symbol) {
return meanStatistics().get(symbol + MEAN).mean();
}

/**
* Map of minimums
*
* @return current map of minimums
*/
private Map<String, Long> minimums() {
return ioStatistics.minimums();
}

/**
* Map of maximums
*
* @return current map of maximums
*/
private Map<String, Long> maximums() {
return ioStatistics.maximums();
}

/**
* Map of meanStatistics
*
* @return current map of MeanStatistic statistics
*/
private Map<String, MeanStatistic> meanStatistics() {
return ioStatistics.meanStatistics();
}
}
Loading

0 comments on commit a820950

Please sign in to comment.