Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,35 @@ public enum AbfsStatistic {
READ_THROTTLES("read_throttles",
"Total number of times a read operation is throttled."),
WRITE_THROTTLES("write_throttles",
"Total number of times a write operation is throttled.");
"Total number of times a write operation is throttled."),

//OutputStream statistics.
BYTES_TO_UPLOAD("bytes_upload",
"Total bytes to upload."),
BYTES_UPLOAD_SUCCESSFUL("bytes_upload_successfully",
"Total bytes uploaded successfully."),
BYTES_UPLOAD_FAILED("bytes_upload_failed",
"Total bytes failed to upload."),
TIME_SPENT_ON_TASK_WAIT("time_spent_task_wait",
"Total time spent on waiting for a task."),
QUEUE_SHRUNK_OPS("queue_shrunk_ops",
"Total number of times blocking queue was shrunk."),
WRITE_CURRENT_BUFFER_OPERATIONS("write_current_buffer_ops",
"Total number of times the current buffer is written to the service."),
TIME_SPENT_ON_PUT_REQUEST("time_spent_on_put_request",
"Total time spent on a put request."),

//InputStream statistics.
SEEK_IN_BUFFER("seek_in_buffer",
"Total number of seek operations performed in the buffer."),
BYTES_READ_BUFFER("bytes_read_buffer",
"Total number of bytes read from the buffer."),
REMOTE_READ_OP("remote_read_op",
"Total number of remote read operations performed."),
READ_AHEAD_BYTES_READ("read_ahead_bytes_read",
"Total number of bytes from readAhead."),
REMOTE_BYTES_READ("remote_bytes_read",
"Total number of bytes read from remote operations.");

private String statName;
private String statDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;

import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -48,7 +53,7 @@
* The AbfsInputStream for AbfsClient.
*/
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
StreamCapabilities, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
// Footer size is set to qualify for both ORC and parquet files
public static final int FOOTER_SIZE = 16 * ONE_KB;
Expand Down Expand Up @@ -92,6 +97,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private long bytesFromRemoteRead; // bytes read remotely; for testing

private final AbfsInputStreamContext context;
private IOStatistics ioStatistics;

public AbfsInputStream(
final AbfsClient client,
Expand Down Expand Up @@ -120,6 +126,9 @@ public AbfsInputStream(
// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
}

public String getPath() {
Expand Down Expand Up @@ -152,7 +161,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
int lastReadBytes;
int totalReadBytes = 0;
if (streamStatistics != null) {
streamStatistics.readOperationStarted(off, len);
streamStatistics.readOperationStarted();
}
incrementReadOps();
do {
Expand Down Expand Up @@ -431,7 +440,10 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
() -> client.read(path, position, b, offset, length,
tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) {
streamStatistics.remoteReadOperation();
Expand Down Expand Up @@ -694,6 +706,11 @@ public boolean shouldAlwaysReadBufferSize() {
return alwaysReadBufferSize;
}

@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}

/**
* Get the statistics of the stream.
* @return a string value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;

/**
* Interface for statistics for the AbfsInputStream.
Expand Down Expand Up @@ -73,11 +74,8 @@ public interface AbfsInputStreamStatistics {

/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/
void readOperationStarted(long pos, long len);
void readOperationStarted();

/**
* Records a successful remote read operation.
Expand All @@ -96,6 +94,12 @@ public interface AbfsInputStreamStatistics {
*/
void remoteBytesRead(long bytes);

/**
* Get the IOStatisticsStore instance from AbfsInputStreamStatistics.
* @return instance of IOStatisticsStore which extends IOStatistics.
*/
IOStatistics getIOStatistics();

/**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
Expand Down
Loading