Skip to content

Commit

Permalink
Add stream related instrumentation and min, max and mean statistics (#…
Browse files Browse the repository at this point in the history
…1010)

* Add Stream related instrumentation and min, max and mean statistics
  • Loading branch information
arunkumarchacko committed Jun 23, 2023
1 parent fbd5eb8 commit c5fa239
Show file tree
Hide file tree
Showing 11 changed files with 972 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ public enum GhfsStatistic {
"op_create_non_recursive", "Calls of createNonRecursive()", TYPE_DURATION),
INVOCATION_DELETE("op_delete", "Calls of delete()", TYPE_DURATION),
INVOCATION_EXISTS("op_exists", "Calls of exists()", TYPE_COUNTER),
INVOCATION_GET_FILE_STATUS("op_get_file_status", "Calls of getFileStatus()", TYPE_COUNTER),
INVOCATION_GLOB_STATUS("op_glob_status", "Calls of globStatus()", TYPE_COUNTER),
INVOCATION_GET_FILE_STATUS("op_get_file_status", "Calls of getFileStatus()", TYPE_DURATION),

INVOCATION_GET_FILE_CHECKSUM("op_get_file_checksum", "Calls of getFileChecksum()", TYPE_COUNTER),
INVOCATION_GLOB_STATUS("op_glob_status", "Calls of globStatus()", TYPE_DURATION),
INVOCATION_HFLUSH("op_hflush", "Calls of hflush()", TYPE_DURATION),
INVOCATION_HSYNC("op_hsync", "Calls of hsync()", TYPE_DURATION),
INVOCATION_LIST_FILES("op_list_files", "Calls of listFiles()", TYPE_COUNTER),
INVOCATION_LIST_STATUS("op_list_status", "Calls of listStatus()", TYPE_COUNTER),
INVOCATION_MKDIRS("op_mkdirs", "Calls of mkdirs()", TYPE_COUNTER),
INVOCATION_MKDIRS("op_mkdirs", "Calls of mkdirs()", TYPE_DURATION),
INVOCATION_OPEN("op_open", "Calls of open()", TYPE_DURATION),
INVOCATION_RENAME("op_rename", "Calls of rename()", TYPE_DURATION),

Expand All @@ -85,6 +87,7 @@ public enum GhfsStatistic {
"Count of exceptions raised during input stream reads",
TYPE_COUNTER),
STREAM_READ_OPERATIONS("stream_read_operations", "Calls of read()", TYPE_DURATION),

STREAM_READ_OPERATIONS_INCOMPLETE(
"stream_read_operations_incomplete",
"Count of incomplete read() operations in an input stream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@

package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.FILES_CREATED;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.INVOCATION_GET_FILE_CHECKSUM;
import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.InvocationRaisingIOE;
import com.google.common.base.Stopwatch;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageStatistics;
Expand All @@ -31,33 +40,58 @@
@InterfaceStability.Unstable
public class GhfsStorageStatistics extends StorageStatistics {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

/** {@value} The key that stores all the registered metrics */
public static final String NAME = "GhfsStorageStatistics";

/** Exention for minimum */
private static final String MINIMUM = ".min";
/** Exention for maximum */
private static final String MAXIMUM = ".max";
/** Exention for mean */
private static final String MEAN = ".max";
public static final int LATENCY_LOGGING_THRESHOLD_MS = 100;

private final Map<GhfsStatistic, AtomicLong> opsCount = new EnumMap<>(GhfsStatistic.class);
private final Map<GhfsStatistic, AtomicLong> minimums = new EnumMap<>(GhfsStatistic.class);
private final Map<GhfsStatistic, AtomicLong> maximums = new EnumMap<>(GhfsStatistic.class);
private final Map<GhfsStatistic, MeanStatistic> means = new EnumMap<>(GhfsStatistic.class);

public GhfsStorageStatistics() {
super(NAME);
for (GhfsStatistic opType : GhfsStatistic.values()) {
opsCount.put(opType, new AtomicLong(0));

if (opType.getType() == GhfsStatisticTypeEnum.TYPE_DURATION) {
minimums.put(opType, null);
maximums.put(opType, new AtomicLong(0));
means.put(opType, new MeanStatistic());
}
}
}

static <B> B trackDuration(
@Nonnull GhfsStorageStatistics stats,
GhfsStatistic statistic,
Object context,
InvocationRaisingIOE<B> operation)
throws IOException {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
stats.increment(statistic);
return operation.apply();
} finally {
stats.updateStats(statistic, stopwatch.elapsed().toMillis(), context);
}
}

private long increment(GhfsStatistic statistic) {
return incrementCounter(statistic, 1);
}

/**
* Increment a specific counter.
*
* @param op operation
* @param count increment value
* @return the new value
*/
public long incrementCounter(GhfsStatistic op, long count) {
long incrementCounter(GhfsStatistic op, long count) {
return opsCount.get(op).addAndGet(count);
}

Expand All @@ -68,7 +102,75 @@ public void reset() {
}
}

void updateStats(GhfsStatistic statistic, long durationMs, Object context) {
checkArgument(
statistic.getType() == GhfsStatisticTypeEnum.TYPE_DURATION,
String.format("Unexpected instrumentation type %s", statistic));
AtomicLong minVal = minimums.get(statistic);
if (minVal == null) {
// There can be race here. It is ok to have the last write win.
minimums.put(statistic, new AtomicLong(durationMs));
} else if (durationMs < minVal.get()) {
minVal.set(durationMs);
}

AtomicLong maxVal = maximums.get(statistic);
if (durationMs > maxVal.get()) {
if (durationMs > LATENCY_LOGGING_THRESHOLD_MS) {
logger.atWarning().log(
"Detected potential high latency for operation %s. latencyMs=%s; previousMaxLatencyMs=%s; operationCount=%s; context=%s",
statistic, durationMs, maxVal.get(), opsCount.get(statistic), context);
}

// There can be race here and can have some data points get missed. This is a corner case.
// Since this function can be called quite frequently, opting for performance over consistency
// here.
maxVal.set(durationMs);
}

if (means.containsKey(statistic)) {
means.get(statistic).addSample(durationMs);
}
}

void streamReadBytes(int bytesRead) {
incrementCounter(GhfsStatistic.STREAM_READ_BYTES, bytesRead);
}

/** If more data was requested than was actually returned, this was an incomplete read. */
void streamReadOperationInComplete(int requested, int actual) {
if (requested > actual) {
increment(GhfsStatistic.STREAM_READ_OPERATIONS_INCOMPLETE);
}
}

void streamReadSeekBackward(long negativeOffset) {
increment(GhfsStatistic.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
incrementCounter(GhfsStatistic.STREAM_READ_SEEK_BYTES_BACKWARDS, -negativeOffset);
}

void streamReadSeekForward(long skipped) {
if (skipped > 0) {
incrementCounter(GhfsStatistic.STREAM_READ_SEEK_BYTES_SKIPPED, skipped);
}

increment(GhfsStatistic.STREAM_READ_SEEK_FORWARD_OPERATIONS);
}

void streamWriteBytes(int bytesWritten) {
incrementCounter(GhfsStatistic.STREAM_WRITE_BYTES, bytesWritten);
}

void filesCreated() {
increment(FILES_CREATED);
}

void getFileCheckSum() {
increment(INVOCATION_GET_FILE_CHECKSUM);
}

private class LongIterator implements Iterator<LongStatistic> {
// TODO: Include statistic related metrics as well.
private Iterator<Map.Entry<GhfsStatistic, AtomicLong>> iterator =
Collections.unmodifiableSet(opsCount.entrySet()).iterator();

Expand All @@ -83,7 +185,7 @@ public LongStatistic next() {
throw new NoSuchElementException();
}
final Map.Entry<GhfsStatistic, AtomicLong> entry = iterator.next();
return new LongStatistic(entry.getKey().name(), entry.getValue().get());
return new LongStatistic(entry.getKey().getSymbol(), entry.getValue().get());
}

@Override
Expand All @@ -100,12 +202,12 @@ public Iterator<LongStatistic> getLongStatistics() {
@Override
public Long getLong(String key) {
final GhfsStatistic type = GhfsStatistic.fromSymbol(key);
return type == null ? null : opsCount.get(type).get();
return type == null ? 0L : opsCount.get(type).get();
}

@Override
public boolean isTracked(String key) {
return GhfsStatistic.fromSymbol(key) == null;
return GhfsStatistic.fromSymbol(key) != null;
}

/**
Expand All @@ -115,7 +217,7 @@ public boolean isTracked(String key) {
* @return minimum statistic value
*/
public Long getMin(String symbol) {
return 0L; // TODO: Update this once duration instrumentations are added
return getStatisticValue(symbol, minimums);
}

/**
Expand All @@ -125,7 +227,7 @@ public Long getMin(String symbol) {
* @return maximum statistic value
*/
public Long getMax(String symbol) {
return 0L; // TODO: Update this once duration instrumentations are added
return getStatisticValue(symbol, maximums);
}

/**
Expand All @@ -134,25 +236,47 @@ public Long getMax(String symbol) {
* @param symbol
* @return mean statistic value
*/
public double getMean(String symbol) {
return 0L; // TODO: Update this once duration instrumentations are added
public double getMean(String key) {
final GhfsStatistic type = GhfsStatistic.fromSymbol(key);
MeanStatistic val = means.get(type);
if (val == null) {
return 0;
}

return val.getValue();
}

/**
* Map of minimums
*
* @return current map of minimums
*/
private Map<String, Long> minimums() {
return null; // TODO: Update this once duration instrumentations are added
private long getStatisticValue(String key, Map<GhfsStatistic, AtomicLong> stats) {
final GhfsStatistic stat = GhfsStatistic.fromSymbol(key);
if (stat == null) {
return 0L;
}

AtomicLong val = stats.get(stat);
if (val == null) {
return 0L;
}

return val.get();
}

/**
* Map of maximums
*
* @return current map of maximums
*/
private Map<String, Long> maximums() {
return null; // TODO: Update this once duration instrumentations are added
/** This class keeps track of mean statistics by keeping track of sum and number of samples. */
static class MeanStatistic {
private int sample;

private long sum;

synchronized void addSample(long val) {
sample++;
sum += val;
}

double getValue() {
if (sample == 0) {
return 0;
}

return sum / sample;
}
}
}
Loading

0 comments on commit c5fa239

Please sign in to comment.