Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.net.HttpURLConnection;

import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
Expand All @@ -41,6 +45,7 @@
*/
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);

private final AbfsClient client;
private final Statistics statistics;
Expand Down Expand Up @@ -239,6 +244,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
final AbfsRestOperation op;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()).registerSuccess(true);
Expand Down Expand Up @@ -431,4 +437,10 @@ public boolean hasCapability(String capability) {
byte[] getBuffer() {
return buffer;
}

@VisibleForTesting
protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
this.cachedSasToken = cachedSasToken;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;

import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;

class ReadBuffer {

private AbfsInputStream stream;
Expand All @@ -40,6 +43,8 @@ class ReadBuffer {
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;

private IOException errException = null;

public AbfsInputStream getStream() {
return stream;
}
Expand Down Expand Up @@ -88,12 +93,23 @@ public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}

public IOException getErrException() {
return errException;
}

public void setErrException(final IOException errException) {
this.errException = errException;
}

public ReadBufferStatus getStatus() {
return status;
}

public void setStatus(ReadBufferStatus status) {
this.status = status;
if (status == READ_FAILED) {
bufferindex = -1;
}
}

public CountDownLatch getLatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;

import com.google.common.annotations.VisibleForTesting;

/**
* The Read Buffer Manager for Rest AbfsClient.
*/
Expand All @@ -36,8 +39,9 @@ final class ReadBufferManager {
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold

private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
Expand Down Expand Up @@ -141,7 +145,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer)
throws IOException {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
Expand Down Expand Up @@ -244,7 +249,7 @@ private synchronized boolean tryEvict() {
earliestBirthday = buf.getTimeStamp();
}
}
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}

Expand All @@ -253,7 +258,12 @@ private synchronized boolean tryEvict() {
}

private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
// As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
// avoid adding it to freeList.
if (buf.getBufferindex() != -1) {
freeList.push(buf.getBufferindex());
}

completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
Expand Down Expand Up @@ -289,6 +299,27 @@ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInpu
return null;
}

/**
* Returns buffers that failed or passed from completed queue.
* @param stream
* @param requestedOffset
* @return
*/
private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : completedReadList) {
// Buffer is returned if the requestedOffset is at or above buffer's
// offset but less than buffer's length or the actual requestedLength
if ((buffer.getStream() == stream)
&& (requestedOffset >= buffer.getOffset())
&& ((requestedOffset < buffer.getOffset() + buffer.getLength())
|| (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) {
return buffer;
}
}

return null;
}

private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
Expand All @@ -299,11 +330,28 @@ private void clearFromReadAheadQueue(final AbfsInputStream stream, final long re
}

private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
final byte[] buffer) throws IOException {
ReadBuffer buf = getBufferFromCompletedQueue(stream, position);

if (buf == null) {
return 0;
}

if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last thresholdAgeMilliseconds
if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) {
throw buf.getErrException();
} else {
return 0;
}
}

if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
|| (position >= buf.getOffset() + buf.getLength())) {
return 0;
}

int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
Expand Down Expand Up @@ -368,14 +416,18 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
// buffer will be deleted as per the eviction policy.
}

buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
}

//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
Expand All @@ -392,4 +444,24 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}

@VisibleForTesting
int getThresholdAgeMilliseconds() {
return thresholdAgeMilliseconds;
}

@VisibleForTesting
static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
thresholdAgeMilliseconds = thresholdAgeMs;
}

@VisibleForTesting
int getCompletedReadListSize() {
return completedReadList.size();
}

@VisibleForTesting
void callTryEvict() {
tryEvict();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
Expand Down Expand Up @@ -61,9 +62,18 @@ public void run() {
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
int bytesRead = buffer.getStream().readRemote(
buffer.getOffset(),
buffer.getBuffer(),
0,
// If AbfsInputStream was created with bigger buffer size than
// read-ahead buffer size, make sure a valid length is passed
// for remote read
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length));

bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
buffer.setErrException(new IOException(ex));
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
Expand Down
Loading