diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 30b2c25b7a98f..16fc965eabfcd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -391,6 +391,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
private boolean isReadAheadV2Enabled;
+ @BooleanConfigurationValidatorAnnotation(
+ ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING,
+ DefaultValue = DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING)
+ private boolean isReadAheadV2DynamicScalingEnabled;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
@@ -411,6 +416,26 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
private int maxReadAheadV2BufferPoolSize;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS,
+ DefaultValue = DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS)
+ private int readAheadV2CpuMonitoringIntervalMillis;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE,
+ DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE)
+ private int readAheadV2ThreadPoolUpscalePercentage;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE,
+ DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE)
+ private int readAheadV2ThreadPoolDownscalePercentage;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS,
+ DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS)
+ private int readAheadV2MemoryMonitoringIntervalMillis;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
@@ -421,6 +446,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
private int readAheadV2CachedBufferTTLMillis;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENTAGE)
+ private int readAheadV2CpuUsageThresholdPercent;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENTAGE)
+ private int readAheadV2MemoryUsageThresholdPercent;
+
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -1424,13 +1459,25 @@ public EncryptionContextProvider createEncryptionContextProvider() {
}
public boolean isReadAheadEnabled() {
- return this.enabledReadAhead;
+ return enabledReadAhead;
+ }
+
+ /**
+ * Checks if the read-ahead v2 feature is enabled by user.
+ * @return true if read-ahead v2 is enabled, false otherwise.
+ */
+ public boolean isReadAheadV2Enabled() {
+ return isReadAheadV2Enabled;
+ }
+
+ public boolean isReadAheadV2DynamicScalingEnabled() {
+ return isReadAheadV2DynamicScalingEnabled;
}
public int getMinReadAheadV2ThreadPoolSize() {
if (minReadAheadV2ThreadPoolSize <= 0) {
// If the minReadAheadV2ThreadPoolSize is not set, use the default value
- return 2 * Runtime.getRuntime().availableProcessors();
+ return DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
}
return minReadAheadV2ThreadPoolSize;
}
@@ -1446,7 +1493,7 @@ public int getMaxReadAheadV2ThreadPoolSize() {
public int getMinReadAheadV2BufferPoolSize() {
if (minReadAheadV2BufferPoolSize <= 0) {
// If the minReadAheadV2BufferPoolSize is not set, use the default value
- return 2 * Runtime.getRuntime().availableProcessors();
+ return DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE;
}
return minReadAheadV2BufferPoolSize;
}
@@ -1459,6 +1506,22 @@ public int getMaxReadAheadV2BufferPoolSize() {
return maxReadAheadV2BufferPoolSize;
}
+ public int getReadAheadV2CpuMonitoringIntervalMillis() {
+ return readAheadV2CpuMonitoringIntervalMillis;
+ }
+
+ public int getReadAheadV2ThreadPoolUpscalePercentage() {
+ return readAheadV2ThreadPoolUpscalePercentage;
+ }
+
+ public int getReadAheadV2ThreadPoolDownscalePercentage() {
+ return readAheadV2ThreadPoolDownscalePercentage;
+ }
+
+ public int getReadAheadV2MemoryMonitoringIntervalMillis() {
+ return readAheadV2MemoryMonitoringIntervalMillis;
+ }
+
public int getReadAheadExecutorServiceTTLInMillis() {
return readAheadExecutorServiceTTLMillis;
}
@@ -1467,12 +1530,12 @@ public int getReadAheadV2CachedBufferTTLMillis() {
return readAheadV2CachedBufferTTLMillis;
}
- /**
- * Checks if the read-ahead v2 feature is enabled by user.
- * @return true if read-ahead v2 is enabled, false otherwise.
- */
- public boolean isReadAheadV2Enabled() {
- return this.isReadAheadV2Enabled;
+ public int getReadAheadV2CpuUsageThresholdPercent() {
+ return readAheadV2CpuUsageThresholdPercent;
+ }
+
+ public int getReadAheadV2MemoryUsageThresholdPercent() {
+ return readAheadV2MemoryUsageThresholdPercent;
}
@VisibleForTesting
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 2732c0ed8fb31..faa13d1d9e71e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -951,24 +951,24 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());
+
return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
- .withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
- .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
- .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
- .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
- .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
- .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
- .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
- .withFooterReadBufferSize(footerReadBufferSize)
- .withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
- .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
- .withShouldReadBufferSizeAlways(
- getAbfsConfiguration().shouldReadBufferSizeAlways())
- .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
- .withBufferedPreadDisabled(bufferedPreadDisabled)
- .withEncryptionAdapter(contextEncryptionAdapter)
- .withAbfsBackRef(fsBackRef)
- .build();
+ .withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
+ .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
+ .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
+ .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
+ .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
+ .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
+ .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
+ .withFooterReadBufferSize(footerReadBufferSize)
+ .withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
+ .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
+ .withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways())
+ .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
+ .withBufferedPreadDisabled(bufferedPreadDisabled)
+ .withEncryptionAdapter(contextEncryptionAdapter)
+ .withAbfsBackRef(fsBackRef)
+ .build();
}
public OutputStream openFileForWrite(final Path path,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 7d73f1a3fe7fc..86db3017b92d3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -269,6 +269,12 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2";
+ /**
+ * Enable or disable dynamic scaling of thread pool and buffer pool of readahead V2.
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = "fs.azure.enable.readahead.v2.dynamic.scaling";
+
/**
* Minimum number of prefetch threads in the thread pool for readahead V2.
* {@value }
@@ -290,6 +296,28 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.max.buffer.pool.size";
+ /**
+ * Interval in milliseconds for periodic monitoring of CPU usage and up/down scaling thread pool size accordingly.
+ * {@value }
+ */
+ public static final String FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = "fs.azure.readahead.v2.cpu.monitoring.interval.millis";
+
+ /**
+ * Percentage by which the thread pool size should be upscaled when CPU usage is low.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = "fs.azure.readahead.v2.thread.pool.upscale.percentage";
+
+ /**
+ * Percentage by which the thread pool size should be downscaled when CPU usage is high.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = "fs.azure.readahead.v2.thread.pool.downscale.percentage";
+
+ /**
+ * Interval in milliseconds for periodic monitoring of memory usage and up/down scaling buffer pool size accordingly.
+ * {@value }
+ */
+ public static final String FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = "fs.azure.readahead.v2.memory.monitoring.interval.millis";
+
/**
* TTL in milliseconds for the idle threads in executor service used by read ahead v2.
*/
@@ -300,6 +328,16 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = "fs.azure.readahead.v2.cached.buffer.ttl.millis";
+ /**
+ * Threshold percentage for CPU usage to scale up/down the thread pool size in read ahead v2.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT = "fs.azure.readahead.v2.cpu.usage.threshold.percent";
+
+ /**
+ * Threshold percentage for memory usage to scale up/down the buffer pool size in read ahead v2.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.readahead.v2.memory.usage.threshold.percent";
+
/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key encoded in Base6format {@value}.*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 92fb6da79b43b..4225680433cfe 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -129,12 +129,19 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
- public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
+ public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = false;
+ public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = 8;
public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
- public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
+ public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = 16;
public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
- public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 3_000;
+ public static final int DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = 6_000;
+ public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = 20;
+ public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = 30;
+ public static final int DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = 6_000;
+ public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 6_000;
public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 6_000;
+ public static final int DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENTAGE = 50;
+ public static final int DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENTAGE = 50;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
@@ -211,6 +218,7 @@ public final class FileSystemConfigurations {
public static final int ZERO = 0;
public static final int HUNDRED = 100;
+ public static final double HUNDRED_D = 100.0;
public static final long THOUSAND = 1000L;
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 38b49603fbb00..9d29614a04416 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -69,7 +69,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
public static final int FOOTER_SIZE = 16 * ONE_KB;
public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
- private int readAheadBlockSize;
+ private final int readAheadBlockSize;
private final AbfsClient client;
private final Statistics statistics;
private final String path;
@@ -132,7 +132,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
- private ReadBufferManager readBufferManager;
+ private final ReadBufferManager readBufferManager;
public AbfsInputStream(
final AbfsClient client,
@@ -191,6 +191,7 @@ public AbfsInputStream(
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
readBufferManager = ReadBufferManagerV1.getBufferManager();
}
+
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
@@ -532,7 +533,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
- readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
+ getReadBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
new TracingContext(readAheadTracingContext));
nextOffset = nextOffset + nextSize;
numReadAheads--;
@@ -541,7 +542,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
}
// try reading from buffers first
- receivedBytes = readBufferManager.getBlock(this, position, length, b);
+ receivedBytes = getReadBufferManager().getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
@@ -745,8 +746,8 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
- if (readBufferManager != null) {
- readBufferManager.purgeBuffersForStream(this);
+ if (getReadBufferManager() != null) {
+ getReadBufferManager().purgeBuffersForStream(this);
}
buffer = null; // de-reference the buffer so it can be GC'ed sooner
if (contextEncryptionAdapter != null) {
@@ -807,7 +808,7 @@ byte[] getBuffer() {
*/
@VisibleForTesting
public boolean isReadAheadEnabled() {
- return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != null;
+ return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager() != null;
}
@VisibleForTesting
@@ -825,6 +826,10 @@ public String getStreamID() {
return inputStreamId;
}
+ public String getETag() {
+ return eTag;
+ }
+
/**
* Getter for AbfsInputStreamStatistics.
*
@@ -922,11 +927,20 @@ long getLimit() {
return this.limit;
}
+ boolean isFirstRead() {
+ return this.firstRead;
+ }
+
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
+ @VisibleForTesting
+ ReadBufferManager getReadBufferManager() {
+ return readBufferManager;
+ }
+
@Override
public int minSeekForVectorReads() {
return S_128K;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index f6272492d6081..cb51fa22900e4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -64,15 +64,33 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private ContextEncryptionAdapter contextEncryptionAdapter = null;
+ /**
+ * Constructs a new {@link AbfsInputStreamContext}.
+ *
+ * @param sasTokenRenewPeriodForStreamsInSeconds SAS token renewal interval in seconds.
+ */
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
+ /**
+ * Sets the read buffer size.
+ *
+ * @param readBufferSize buffer size in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
+ /**
+ * Sets the read-ahead queue depth.
+ * Defaults to the number of available processors if negative.
+ *
+ * @param readAheadQueueDepth queue depth.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadAheadQueueDepth(
final int readAheadQueueDepth) {
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
@@ -81,83 +99,169 @@ public AbfsInputStreamContext withReadAheadQueueDepth(
return this;
}
+ /**
+ * Enables or disables tolerance for out-of-band appends.
+ *
+ * @param tolerateOobAppends whether OOB appends should be tolerated.
+ * @return this instance.
+ */
public AbfsInputStreamContext withTolerateOobAppends(
final boolean tolerateOobAppends) {
this.tolerateOobAppends = tolerateOobAppends;
return this;
}
+ /**
+ * Enables or disables read-ahead feature.
+ *
+ * @param isReadAheadEnabled whether read-ahead is enabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext isReadAheadEnabled(
final boolean isReadAheadEnabled) {
this.isReadAheadEnabled = isReadAheadEnabled;
return this;
}
+ /**
+ * Enables or disables read-ahead version 2.
+ *
+ * @param isReadAheadV2Enabled whether read-ahead V2 is enabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext isReadAheadV2Enabled(
final boolean isReadAheadV2Enabled) {
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
return this;
}
+ /**
+ * Sets the read-ahead range.
+ *
+ * @param readAheadRange range in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
return this;
}
+ /**
+ * Sets stream statistics collector.
+ *
+ * @param streamStatistics statistics instance.
+ * @return this instance.
+ */
public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
return this;
}
+ /**
+ * Enables or disables complete read of small files in a single operation.
+ *
+ * @param readSmallFilesCompletely whether small files should be fully read.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadSmallFilesCompletely(
final boolean readSmallFilesCompletely) {
this.readSmallFilesCompletely = readSmallFilesCompletely;
return this;
}
+ /**
+ * Enables or disables footer read optimization.
+ *
+ * @param optimizeFooterRead whether footer read optimization is enabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext withOptimizeFooterRead(
final boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
return this;
}
+ /**
+ * Sets the footer read buffer size.
+ *
+ * @param footerReadBufferSize size in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}
+ /**
+ * Forces use of the configured read buffer size always.
+ *
+ * @param alwaysReadBufferSize whether to always use configured buffer size.
+ * @return this instance.
+ */
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
return this;
}
+ /**
+ * Sets the read-ahead block size.
+ *
+ * @param readAheadBlockSize block size in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadAheadBlockSize(
final int readAheadBlockSize) {
this.readAheadBlockSize = readAheadBlockSize;
return this;
}
+ /**
+ * Enables or disables buffered positional reads.
+ *
+ * @param bufferedPreadDisabled whether buffered pread is disabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext withBufferedPreadDisabled(
final boolean bufferedPreadDisabled) {
this.bufferedPreadDisabled = bufferedPreadDisabled;
return this;
}
+ /**
+ * Sets a back reference to the filesystem that created this stream.
+ *
+ * @param fsBackRef filesystem back reference.
+ * @return this instance.
+ */
public AbfsInputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
- public AbfsInputStreamContext withEncryptionAdapter(
- ContextEncryptionAdapter contextEncryptionAdapter){
- this.contextEncryptionAdapter = contextEncryptionAdapter;
- return this;
- }
+ /**
+ * Sets the context encryption adapter.
+ *
+ * @param contextEncryptionAdapter encryption adapter.
+ * @return this instance.
+ */
+ public AbfsInputStreamContext withEncryptionAdapter(
+ ContextEncryptionAdapter contextEncryptionAdapter){
+ this.contextEncryptionAdapter = contextEncryptionAdapter;
+ return this;
+ }
+ /**
+ * Finalizes and validates the context configuration.
+ *
+ * Ensures read-ahead range is valid and aligns read-ahead block size with
+ * read request size if necessary.
+ *
+ * @return this instance.
+ */
public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
@@ -173,63 +277,78 @@ public AbfsInputStreamContext build() {
return this;
}
+ /** @return configured read buffer size. */
public int getReadBufferSize() {
return readBufferSize;
}
+ /** @return read-ahead queue depth. */
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}
+ /** @return whether out-of-band appends are tolerated. */
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
+ /** @return whether read-ahead is enabled. */
public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}
+ /** @return whether read-ahead V2 is enabled. */
public boolean isReadAheadV2Enabled() {
return isReadAheadV2Enabled;
}
+ /** @return read-ahead range. */
public int getReadAheadRange() {
return readAheadRange;
}
+ /** @return stream statistics collector. */
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
+ /** @return whether small files should be read completely. */
public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
+ /** @return whether footer read optimization is enabled. */
public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}
+ /** @return footer read buffer size. */
public int getFooterReadBufferSize() {
return footerReadBufferSize;
}
+ /** @return whether the configured buffer size is always used. */
public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
+ /** @return read-ahead block size. */
public int getReadAheadBlockSize() {
return readAheadBlockSize;
}
+ /** @return whether buffered pread is disabled. */
public boolean isBufferedPreadDisabled() {
return bufferedPreadDisabled;
}
+ /** @return filesystem back reference. */
public BackReference getFsBackRef() {
return fsBackRef;
}
- public ContextEncryptionAdapter getEncryptionAdapter() {
- return contextEncryptionAdapter;
- }
+ /** @return context encryption adapter. */
+ public ContextEncryptionAdapter getEncryptionAdapter() {
+ return contextEncryptionAdapter;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index 9ce926d841c84..a6aa75f59d261 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -29,6 +30,8 @@
class ReadBuffer {
private AbfsInputStream stream;
+ private String eTag;
+ private String path; // path of the file this buffer is for
private long offset; // offset within the file for the buffer
private int length; // actual length, set after the buffer is filles
private int requestedLength; // requested length of the read
@@ -44,6 +47,7 @@ class ReadBuffer {
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
+ private AtomicInteger refCount = new AtomicInteger(0);
private IOException errException = null;
@@ -51,10 +55,26 @@ public AbfsInputStream getStream() {
return stream;
}
+ public String getETag() {
+ return eTag;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
public void setStream(AbfsInputStream stream) {
this.stream = stream;
}
+ public void setETag(String eTag) {
+ this.eTag = eTag;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
public void setTracingContext(TracingContext tracingContext) {
this.tracingContext = tracingContext;
}
@@ -122,6 +142,20 @@ public void setStatus(ReadBufferStatus status) {
}
}
+ public void startReading() {
+ refCount.getAndIncrement();
+ }
+
+ public void endReading() {
+ if (refCount.decrementAndGet() < 0) {
+ throw new IllegalStateException("ReadBuffer refCount cannot be negative");
+ }
+ }
+
+ public int getRefCount() {
+ return refCount.get();
+ }
+
public CountDownLatch getLatch() {
return latch;
}
@@ -162,4 +196,7 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
+ public boolean isFullyConsumed() {
+ return isFirstByteConsumed() && isLastByteConsumed();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 9ee128fbc3275..712b04fb4999c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -119,7 +119,6 @@ abstract void doneReading(ReadBuffer buffer,
*/
abstract void purgeBuffersForStream(AbfsInputStream stream);
-
// Following Methods are for testing purposes only and should not be used in production code.
/**
@@ -206,7 +205,7 @@ protected static void setReadAheadBlockSize(int readAheadBlockSize) {
*
* @return the stack of free buffer indices
*/
- public Stack getFreeList() {
+ Stack getFreeList() {
return freeList;
}
@@ -215,7 +214,7 @@ public Stack getFreeList() {
*
* @return the queue of {@link ReadBuffer} objects in the read-ahead queue
*/
- public Queue getReadAheadQueue() {
+ Queue getReadAheadQueue() {
return readAheadQueue;
}
@@ -224,7 +223,7 @@ public Queue getReadAheadQueue() {
*
* @return the list of {@link ReadBuffer} objects that are currently being processed
*/
- public LinkedList getInProgressList() {
+ LinkedList getInProgressList() {
return inProgressList;
}
@@ -233,7 +232,7 @@ public LinkedList getInProgressList() {
*
* @return the list of {@link ReadBuffer} objects that have been read and are available for use
*/
- public LinkedList getCompletedReadList() {
+ LinkedList getCompletedReadList() {
return completedReadList;
}
@@ -244,7 +243,7 @@ public LinkedList getCompletedReadList() {
* @return a list of free buffer indices
*/
@VisibleForTesting
- protected synchronized List getFreeListCopy() {
+ List getFreeListCopy() {
return new ArrayList<>(freeList);
}
@@ -254,7 +253,7 @@ protected synchronized List getFreeListCopy() {
* @return a list of {@link ReadBuffer} objects in the read-ahead queue
*/
@VisibleForTesting
- protected synchronized List getReadAheadQueueCopy() {
+ synchronized List getReadAheadQueueCopy() {
return new ArrayList<>(readAheadQueue);
}
@@ -264,7 +263,7 @@ protected synchronized List getReadAheadQueueCopy() {
* @return a list of in-progress {@link ReadBuffer} objects
*/
@VisibleForTesting
- protected synchronized List getInProgressCopiedList() {
+ synchronized List getInProgressListCopy() {
return new ArrayList<>(inProgressList);
}
@@ -274,7 +273,7 @@ protected synchronized List getInProgressCopiedList() {
* @return a list of completed {@link ReadBuffer} objects
*/
@VisibleForTesting
- protected synchronized List getCompletedReadListCopy() {
+ synchronized List getCompletedReadListCopy() {
return new ArrayList<>(completedReadList);
}
@@ -284,7 +283,7 @@ protected synchronized List getCompletedReadListCopy() {
* @return the number of completed read buffers
*/
@VisibleForTesting
- protected int getCompletedReadListSize() {
+ int getCompletedReadListSize() {
return completedReadList.size();
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index fe1ac3fa1f235..e476f6d744614 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -33,7 +33,7 @@
* The Read Buffer Manager for Rest AbfsClient.
* V1 implementation of ReadBufferManager.
*/
-final class ReadBufferManagerV1 extends ReadBufferManager {
+public final class ReadBufferManagerV1 extends ReadBufferManager {
private static final int NUM_BUFFERS = 16;
private static final int NUM_THREADS = 8;
@@ -41,7 +41,7 @@ final class ReadBufferManagerV1 extends ReadBufferManager {
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers;
- private static ReadBufferManagerV1 bufferManager;
+ private static ReadBufferManagerV1 bufferManager;
// hide instance constructor
private ReadBufferManagerV1() {
@@ -52,7 +52,7 @@ private ReadBufferManagerV1() {
* Sets the read buffer manager configurations.
* @param readAheadBlockSize the size of the read-ahead block in bytes
*/
- static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+ public static void setReadBufferManagerConfigs(int readAheadBlockSize) {
if (bufferManager == null) {
LOGGER.debug(
"ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}",
@@ -88,7 +88,7 @@ static ReadBufferManagerV1 getBufferManager() {
void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected
+ buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. The byte array never goes back to GC
getFreeList().add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 9cce860127dae..c7e6e4c3d28d4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -17,67 +17,112 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import com.sun.management.OperatingSystemMXBean;
+
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
-final class ReadBufferManagerV2 extends ReadBufferManager {
+/**
+ * The Improved Read Buffer Manager for Rest AbfsClient.
+ */
+public final class ReadBufferManagerV2 extends ReadBufferManager {
+
+ // Internal constants
+ private static final ReentrantLock LOCK = new ReentrantLock();
// Thread Pool Configurations
private static int minThreadPoolSize;
+
private static int maxThreadPoolSize;
+
+ private static int cpuMonitoringIntervalInMilliSec;
+
+ private static double cpuThreshold;
+
+ private static int threadPoolUpscalePercentage;
+
+ private static int threadPoolDownscalePercentage;
+
private static int executorServiceKeepAliveTimeInMilliSec;
+
+ private static final double THREAD_POOL_REQUIREMENT_BUFFER = 1.2;
+ // 20% more threads than the queue size
+
+ private static boolean isDynamicScalingEnabled;
+
+ private ScheduledExecutorService cpuMonitorThread;
+
private ThreadPoolExecutor workerPool;
+ private final List workerRefs = new ArrayList<>();
+
// Buffer Pool Configurations
private static int minBufferPoolSize;
+
private static int maxBufferPoolSize;
- private int numberOfActiveBuffers = 0;
+
+ private static int memoryMonitoringIntervalInMilliSec;
+
+ private static double memoryThreshold;
+
+ private final AtomicInteger numberOfActiveBuffers = new AtomicInteger(0);
+
private byte[][] bufferPool;
+ private final Stack removedBufferList = new Stack<>();
+
+ private ScheduledExecutorService memoryMonitorThread;
+
+ // Buffer Manager Structures
private static ReadBufferManagerV2 bufferManager;
- // hide instance constructor
- private ReadBufferManagerV2() {
- LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
- }
+ private static AtomicBoolean isConfigured = new AtomicBoolean(false);
/**
- * Sets the read buffer manager configurations.
- * @param readAheadBlockSize the size of the read-ahead block in bytes
- * @param abfsConfiguration the AbfsConfiguration instance for other configurations
+ * Private constructor to prevent instantiation as this needs to be singleton.
*/
- static void setReadBufferManagerConfigs(int readAheadBlockSize, AbfsConfiguration abfsConfiguration) {
- if (bufferManager == null) {
- minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
- maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
- executorServiceKeepAliveTimeInMilliSec = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
-
- minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
- maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
- setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
- setReadAheadBlockSize(readAheadBlockSize);
- }
+ private ReadBufferManagerV2() {
+ printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
}
- /**
- * Returns the singleton instance of ReadBufferManagerV2.
- * @return the singleton instance of ReadBufferManagerV2
- */
static ReadBufferManagerV2 getBufferManager() {
+ if (!isConfigured.get()) {
+ throw new IllegalStateException("ReadBufferManagerV2 is not configured. "
+ + "Please call setReadBufferManagerConfigs() before calling getBufferManager().");
+ }
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
bufferManager = new ReadBufferManagerV2();
bufferManager.init();
+ LOGGER.trace("ReadBufferManagerV2 singleton initialized");
}
} finally {
LOCK.unlock();
@@ -87,17 +132,73 @@ static ReadBufferManagerV2 getBufferManager() {
}
/**
- * {@inheritDoc}
+ * Set the ReadBufferManagerV2 configurations based on the provided before singleton initialization.
+ * @param readAheadBlockSize the read-ahead block size to set for the ReadBufferManagerV2.
+ * @param abfsConfiguration the configuration to set for the ReadBufferManagerV2.
+ */
+ public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
+ final AbfsConfiguration abfsConfiguration) {
+ // Set Configs only before initializations.
+ if (bufferManager == null && !isConfigured.get()) {
+ LOCK.lock();
+ try {
+ if (bufferManager == null && !isConfigured.get()) {
+ minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+ maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+ cpuMonitoringIntervalInMilliSec
+ = abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
+ cpuThreshold = abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()
+ / HUNDRED_D;
+ threadPoolUpscalePercentage
+ = abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
+ threadPoolDownscalePercentage
+ = abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage();
+ executorServiceKeepAliveTimeInMilliSec
+ = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+ minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
+ maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
+ memoryMonitoringIntervalInMilliSec
+ = abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis();
+ memoryThreshold =
+ abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()
+ / HUNDRED_D;
+ setThresholdAgeMilliseconds(
+ abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
+ isDynamicScalingEnabled
+ = abfsConfiguration.isReadAheadV2DynamicScalingEnabled();
+ setReadAheadBlockSize(readAheadBlockSize);
+ setIsConfigured(true);
+ }
+ } finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * Initialize the singleton ReadBufferManagerV2.
*/
@Override
void init() {
- // Initialize Buffer Pool
+ // Initialize Buffer Pool. Size can never be more than max pool size
bufferPool = new byte[maxBufferPoolSize][];
for (int i = 0; i < minBufferPoolSize; i++) {
- bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected
+ // Start with just minimum number of buffers.
+ bufferPool[i]
+ = new byte[getReadAheadBlockSize()]; // same buffers are reused. The byte array never goes back to GC
getFreeList().add(i);
- numberOfActiveBuffers++;
+ numberOfActiveBuffers.getAndIncrement();
}
+ memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread t = new Thread(runnable, "ReadAheadV2-Memory-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ memoryMonitorThread.scheduleAtFixedRate(this::scheduledEviction,
+ getMemoryMonitoringIntervalInMilliSec(),
+ getMemoryMonitoringIntervalInMilliSec(), TimeUnit.MILLISECONDS);
// Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
workerPool = new ThreadPoolExecutor(
@@ -106,123 +207,893 @@ void init() {
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
- namedThreadFactory);
+ workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, this);
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+ if (isDynamicScalingEnabled) {
+ cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+ getCpuMonitoringIntervalInMilliSec(),
+ getCpuMonitoringIntervalInMilliSec(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ printTraceLog(
+ "ReadBufferManagerV2 initialized with {} buffers and {} worker threads",
+ numberOfActiveBuffers.get(), workerRefs.size());
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method to queueing read-ahead.
+ * @param stream which read-ahead is requested from.
+ * @param requestedOffset The offset in the file which should be read.
+ * @param requestedLength The length to read.
*/
@Override
public void queueReadAhead(final AbfsInputStream stream,
final long requestedOffset,
final int requestedLength,
- final TracingContext tracingContext) {
- // TODO: To be implemented
+ TracingContext tracingContext) {
+ printTraceLog(
+ "Start Queueing readAhead for file: {}, with eTag: {}, "
+ + "offset: {}, length: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
+ stream.hashCode());
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+ // Already queued for this offset, so skip queuing.
+ printTraceLog(
+ "Skipping queuing readAhead for file: {}, with eTag: {}, "
+ + "offset: {}, triggered by stream: {} as it is already queued",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ stream.hashCode());
+ return;
+ }
+ if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+ // No buffers are available and more buffers cannot be created. Skip queuing.
+ printTraceLog(
+ "Skipping queuing readAhead for file: {}, with eTag: {}, offset: {}, triggered by stream: {} as no buffers are available",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ stream.hashCode());
+ return;
+ }
+
+ // Create a new ReadBuffer to keep the prefetched data and queue.
+ buffer = new ReadBuffer();
+ buffer.setStream(stream); // To map buffer with stream that requested it
+ buffer.setETag(stream.getETag()); // To map buffer with file it belongs to
+ buffer.setPath(stream.getPath());
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+ buffer.setTracingContext(tracingContext);
+
+ if (isFreeListEmpty()) {
+ /*
+ * By now there should be at least one buffer available.
+ * This is to double sure that after upscaling or eviction,
+ * we still have free buffer available. If not, we skip queueing.
+ */
+ return;
+ }
+ Integer bufferIndex = popFromFreeList();
+ if (bufferIndex > bufferPool.length) {
+ // This should never happen.
+ printTraceLog(
+ "Skipping queuing readAhead for file: {}, with eTag: {}, offset: {}, triggered by stream: {} as invalid buffer index popped from free list",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ stream.hashCode());
+ return;
+ }
+ buffer.setBuffer(bufferPool[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ getReadAheadQueue().add(buffer);
+ notifyAll();
+ printTraceLog(
+ "Done q-ing readAhead for file: {}, with eTag:{}, offset: {}, "
+ + "buffer idx: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ buffer.getBufferindex(), stream.hashCode());
+ }
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while - the calling thread can do its own
+ * read to get the data faster (compared to the read waiting in queue for an indeterminate amount of time).
+ *
+ * @param stream of the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
+ * @return the number of bytes read
*/
@Override
public int getBlock(final AbfsInputStream stream,
final long position,
final int length,
- final byte[] buffer) throws IOException {
- // TODO: To be implemented
+ final byte[] buffer)
+ throws IOException {
+ // not synchronized, so have to be careful with locking
+ printTraceLog(
+ "getBlock request for file: {}, with eTag: {}, for position: {} "
+ + "for length: {} received from stream: {}",
+ stream.getPath(), stream.getETag(), position, length,
+ stream.hashCode());
+
+ String requestedETag = stream.getETag();
+ boolean isFirstRead = stream.isFirstRead();
+
+ // Wait for any in-progress read to complete.
+ waitForProcess(requestedETag, position, isFirstRead);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
+ buffer);
+ }
+ if (bytesRead > 0) {
+ printTraceLog(
+ "Done read from Cache for the file with eTag: {}, position: {}, length: {}, requested by stream: {}",
+ requestedETag, position, bytesRead, stream.hashCode());
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this to get the next buffer that it should work on.
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
- // TODO: To be implemented
- return null;
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ // Blocking Call to wait for prefetch to be queued.
+ while (getReadAheadQueue().size() == 0) {
+ wait();
+ }
+
+ buffer = getReadAheadQueue().remove();
+ notifyAll();
+ if (buffer == null) {
+ return null;
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ getInProgressList().add(buffer);
+ }
+ printTraceLog(
+ "ReadBufferWorker picked file: {}, with eTag: {}, for offset: {}, "
+ + "queued by stream: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
+ buffer.getStream().hashCode());
+ return buffer;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this method to post completion. *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
@Override
public void doneReading(final ReadBuffer buffer,
final ReadBufferStatus result,
final int bytesActuallyRead) {
- // TODO: To be implemented
+ printTraceLog(
+ "ReadBufferWorker completed prefetch for file: {} with eTag: {}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
+ buffer.getStream().hashCode(), result, bytesActuallyRead);
+ synchronized (this) {
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (getInProgressList().contains(buffer)) {
+ getInProgressList().remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ // Successful read, so update the buffer status and length
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ // Failed read, reuse buffer for next read, this buffer will be
+ // evicted later based on eviction policy.
+ pushToFreeList(buffer.getBufferindex());
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ getCompletedReadList().add(buffer);
+ }
+ }
+
+ //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
- * {@inheritDoc}
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManagerV2} when stream is closed.
+ * @param stream input stream.
*/
- @Override
- public void purgeBuffersForStream(final AbfsInputStream stream) {
- // TODO: To be implemented
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+ getReadAheadQueue().removeIf(
+ readBuffer -> readBuffer.getStream() == stream);
+ purgeList(stream, getCompletedReadList());
}
/**
- * {@inheritDoc}
+ * Check if any buffer is already queued for the requested offset.
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return whether any buffer is already queued
*/
- @VisibleForTesting
- @Override
- public int getNumBuffers() {
- return numberOfActiveBuffers;
+ private boolean isAlreadyQueued(final String eTag,
+ final long requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+ || isInList(getInProgressList(), eTag, requestedOffset)
+ || isInList(getCompletedReadList(), eTag, requestedOffset));
+ }
+
+ /**
+ * Check if any buffer in the list contains the requested offset.
+ * @param list the list to check
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return whether any buffer in the list contains the requested offset
+ */
+ private boolean isInList(final Collection list, final String eTag,
+ final long requestedOffset) {
+ return (getFromList(list, eTag, requestedOffset) != null);
}
+
/**
- * {@inheritDoc}
+ * Get the buffer from the list that contains the requested offset.
+ * @param list the list to check
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return the buffer if found, null otherwise
*/
- @VisibleForTesting
- @Override
- public void callTryEvict() {
- // TODO: To be implemented
+ private ReadBuffer getFromList(final Collection list,
+ final String eTag,
+ final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (eTag.equals(buffer.getETag())) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset
+ < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * If any buffer in the completed list can be reclaimed then reclaim it and return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to evicting more than one.
+ * @return whether the eviction succeeded - i.e., were we able to free up one buffer
+ */
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (getCompletedReadList().size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ long currentTimeInMs = currentTimeMillis();
+
+ // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isFullyConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (maybe a bad idea? have to experiment and see)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ // Failed read buffers (with buffer index=-1) that are older than
+ // thresholdAge should be cleaned up, but at the same time should not
+ // report successful eviction.
+ // Queue logic expects that a buffer is freed up for read ahead when
+ // eviction is successful, whereas a failed ReadBuffer would have released
+ // its buffer when its status was set to READ_FAILED.
+ long earliestBirthday = Long.MAX_VALUE;
+ ArrayList oldFailedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if ((buf.getBufferindex() != -1)
+ && (buf.getTimeStamp() < earliestBirthday)) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ } else if ((buf.getBufferindex() == -1)
+ && (currentTimeInMs - buf.getTimeStamp())
+ > getThresholdAgeMilliseconds()) {
+ oldFailedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : oldFailedBuffers) {
+ manualEviction(buf);
+ }
+
+ if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
+ && (nodeToEvict != null)) {
+ return manualEviction(nodeToEvict);
+ }
+
+ printTraceLog("No buffer eligible for eviction");
+ // nothing can be evicted
+ return false;
+ }
+
+ /**
+ * Evict the given buffer.
+ * @param buf the buffer to evict
+ * @return whether the eviction succeeded
+ */
+ private boolean evict(final ReadBuffer buf) {
+ if (buf.getRefCount() > 0) {
+ // If the buffer is still being read, then we cannot evict it.
+ printTraceLog(
+ "Cannot evict buffer with index: {}, file: {}, with eTag: {}, offset: {} as it is still being read by some input stream",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+ return false;
+ }
+ // As failed ReadBuffers (bufferIndx = -1) are saved in getCompletedReadList(),
+ // avoid adding it to availableBufferList.
+ if (buf.getBufferindex() != -1) {
+ pushToFreeList(buf.getBufferindex());
+ }
+ getCompletedReadList().remove(buf);
+ buf.setTracingContext(null);
+ printTraceLog(
+ "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.isFullyConsumed(), buf.isAnyByteConsumed());
+ return true;
+ }
+
+ /**
+ * Wait for any in-progress read for the requested offset to complete.
+ * @param eTag the eTag of the file
+ * @param position the requested offset
+ * @param isFirstRead whether this is the first read of the stream
+ */
+ private void waitForProcess(final String eTag,
+ final long position,
+ boolean isFirstRead) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+ if (readBuf == null) {
+ readBuf = getFromList(getInProgressList(), eTag, position);
+ }
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for it
+ try {
+ printTraceLog(
+ "A relevant read buffer for file: {}, with eTag: {}, offset: {}, "
+ + "queued by stream: {}, having buffer idx: {} is being prefetched, waiting for latch",
+ readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(),
+ readBuf.getStream().hashCode(), readBuf.getBufferindex());
+ readBuf.getLatch()
+ .await(); // blocking wait on the caller stream's thread
+ // Note on correctness: readBuf gets out of getInProgressList() only in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set after removing the buffer from
+ // getInProgressList(). So this latch is safe to be outside the synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
+ // then the latch can be removed and replaced with wait/notify whenever getInProgressList() is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+ + "buffer index: {} queued by stream: {}", readBuf.getPath(),
+ readBuf.getETag(),
+ readBuf.getOffset(), readBuf.getBufferindex(),
+ readBuf.getStream().hashCode());
+ }
+ }
+
+ /**
+ * Clear the buffer from read-ahead queue if it exists.
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @param isFirstRead whether this is the first read of the stream
+ * @return the buffer if found, null otherwise
+ */
+ private ReadBuffer clearFromReadAheadQueue(final String eTag,
+ final long requestedOffset,
+ boolean isFirstRead) {
+ ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag, requestedOffset);
+ /*
+ * If this prefetch was triggered by first read of this input stream,
+ * we should not remove it from queue and let it complete by backend threads.
+ */
+ if (buffer != null && isFirstRead) {
+ return buffer;
+ }
+ if (buffer != null) {
+ getReadAheadQueue().remove(buffer);
+ notifyAll(); // lock is held in calling method
+ pushToFreeList(buffer.getBufferindex());
+ }
+ return null;
+ }
+
+ /**
+ * Get the block from completed queue if it exists.
+ * @param eTag the eTag of the file
+ * @param position the requested offset
+ * @param length the length to read
+ * @param buffer the buffer to read data into
+ * @return the number of bytes read
+ * @throws IOException if an I/O error occurs
+ */
+ private int getBlockFromCompletedQueue(final String eTag, final long position,
+ final int length, final byte[] buffer) throws IOException {
+ ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
+
+ if (buf == null) {
+ return 0;
+ }
+
+ buf.startReading(); // atomic increment of refCount.
+
+ if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+ // To prevent new read requests to fail due to old read-ahead attempts,
+ // return exception only from buffers that failed within last getThresholdAgeMilliseconds()
+ if ((currentTimeMillis() - (buf.getTimeStamp())
+ < getThresholdAgeMilliseconds())) {
+ throw buf.getErrException();
+ } else {
+ return 0;
+ }
+ }
+
+ if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+ || (position >= buf.getOffset() + buf.getLength())) {
+ return 0;
+ }
+
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+
+ buf.endReading(); // atomic decrement of refCount
+ return lengthToCopy;
+ }
+
+ /**
+ * Get the buffer from completed queue that contains the requested offset.
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return the buffer if found, null otherwise
+ */
+ private ReadBuffer getBufferFromCompletedQueue(final String eTag,
+ final long requestedOffset) {
+ for (ReadBuffer buffer : getCompletedReadList()) {
+ // Buffer is returned if the requestedOffset is at or above buffer's
+ // offset but less than buffer's length or the actual requestedLength
+ if (eTag.equals(buffer.getETag())
+ && (requestedOffset >= buffer.getOffset())
+ && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+ || (requestedOffset
+ < buffer.getOffset() + buffer.getRequestedLength()))) {
+ return buffer;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Try to upscale memory by adding more buffers to the pool if memory usage is below threshold.
+ * @return whether the upscale succeeded
+ */
+ private synchronized boolean tryMemoryUpscale() {
+ if (!isDynamicScalingEnabled) {
+ printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
+ return false; // Dynamic scaling is disabled, so no upscaling.
+ }
+ double memoryLoad = getMemoryLoad();
+ if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
+ // Create and Add more buffers in getFreeList().
+ int nextIndx = getNumBuffers();
+ if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) {
+ bufferPool[nextIndx] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(nextIndx);
+ } else {
+ // Reuse a removed buffer index.
+ int freeIndex = removedBufferList.pop();
+ if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) {
+ printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+ freeIndex, bufferPool.length);
+ return false;
+ }
+ bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(freeIndex);
+ }
+ incrementActiveBufferCount();
+ printTraceLog(
+ "Current Memory Load: {}. Incrementing buffer pool size to {}",
+ memoryLoad, getNumBuffers());
+ return true;
+ }
+ printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load: {}",
+ getNumBuffers(), memoryLoad);
+ return false;
+ }
+
+ /**
+ * Scheduled Eviction task that runs periodically to evict old buffers.
+ */
+ private void scheduledEviction() {
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (currentTimeMillis() - buf.getTimeStamp()
+ > getThresholdAgeMilliseconds()) {
+ // If the buffer is older than thresholdAge, evict it.
+ printTraceLog(
+ "Scheduled Eviction of Buffer Triggered for BufferIndex: {}, "
+ + "file: {}, with eTag: {}, offset: {}, length: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.getLength(), buf.getStream().hashCode());
+ evict(buf);
+ }
+ }
+
+ double memoryLoad = getMemoryLoad();
+ if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+ synchronized (this) {
+ if (isFreeListEmpty()) {
+ printTraceLog(
+ "No free buffers available. Skipping downscale of buffer pool");
+ return; // No free buffers available, so cannot downscale.
+ }
+ int freeIndex = popFromFreeList();
+ if (freeIndex > bufferPool.length || bufferPool[freeIndex] == null) {
+ printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+ freeIndex, bufferPool.length);
+ return;
+ }
+ bufferPool[freeIndex] = null;
+ removedBufferList.add(freeIndex);
+ decrementActiveBufferCount();
+ printTraceLog(
+ "Current Memory Load: {}. Decrementing buffer pool size to {}",
+ memoryLoad, getNumBuffers());
+ }
+ }
+ }
+
+ /**
+ * Manual Eviction of a buffer.
+ * @param buf the buffer to evict
+ * @return whether the eviction succeeded
+ */
+ private boolean manualEviction(final ReadBuffer buf) {
+ printTraceLog(
+ "Manual Eviction of Buffer Triggered for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.getStream().hashCode());
+ return evict(buf);
+ }
+
+ /**
+ * Adjust the thread pool size based on CPU load and queue size.
+ */
+ private void adjustThreadPool() {
+ int currentPoolSize = workerRefs.size();
+ double cpuLoad = getCpuLoad();
+ int requiredPoolSize = getRequiredThreadPoolSize();
+ int newThreadPoolSize;
+ printTraceLog(
+ "Current CPU load: {}, Current worker pool size: {}, Current queue size: {}",
+ cpuLoad, currentPoolSize, requiredPoolSize);
+ if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+ // Submit more background tasks.
+ newThreadPoolSize = Math.min(maxThreadPoolSize,
+ (int) Math.ceil(
+ (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
+ / HUNDRED_D));
+ // Create new Worker Threads
+ for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
+ workerPool.submit(worker);
+ }
+ printTraceLog("Increased worker pool size from {} to {}", currentPoolSize,
+ newThreadPoolSize);
+ } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
+ newThreadPoolSize = Math.max(minThreadPoolSize,
+ (int) Math.ceil(
+ (currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
+ / HUNDRED_D));
+ // Signal the extra workers to stop
+ while (workerRefs.size() > newThreadPoolSize) {
+ ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+ worker.stop();
+ }
+ printTraceLog("Decreased worker pool size from {} to {}", currentPoolSize,
+ newThreadPoolSize);
+ } else {
+ printTraceLog("No change in worker pool size. CPU load: {} Pool size: {}",
+ cpuLoad, currentPoolSize);
+ }
+ }
+
+ /**
+ * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
+ * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
+ * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
+ * Note: it is not monotonic across Sockets, and even within a CPU, its only the
+ * more recent parts which share a clock across all cores.
+ *
+ * @return current time in milliseconds
+ */
+ private long currentTimeMillis() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ }
+
+ /**
+ * Purge all buffers associated with the given stream from the given list.
+ * @param stream the stream whose buffers are to be purged
+ * @param list the list to purge from
+ */
+ private void purgeList(AbfsInputStream stream, LinkedList list) {
+ for (Iterator it = list.iterator(); it.hasNext();) {
+ ReadBuffer readBuffer = it.next();
+ if (readBuffer.getStream() == stream) {
+ it.remove();
+ // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+ // list in doneReading method, we will skip adding those here again.
+ if (readBuffer.getBufferindex() != -1) {
+ pushToFreeList(readBuffer.getBufferindex());
+ }
+ }
+ }
}
/**
- * {@inheritDoc}
+ * Test method that can clean up the current state of readAhead buffers and
+ * the lists. Will also trigger a fresh init.
*/
@VisibleForTesting
@Override
public void testResetReadBufferManager() {
- // TODO: To be implemented
+ synchronized (this) {
+ ArrayList completedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf != null) {
+ completedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : completedBuffers) {
+ manualEviction(buf);
+ }
+
+ getReadAheadQueue().clear();
+ getInProgressList().clear();
+ getCompletedReadList().clear();
+ getFreeList().clear();
+ for (int i = 0; i < maxBufferPoolSize; i++) {
+ bufferPool[i] = null;
+ }
+ bufferPool = null;
+ if (cpuMonitorThread != null) {
+ cpuMonitorThread.shutdownNow();
+ }
+ if (memoryMonitorThread != null) {
+ memoryMonitorThread.shutdownNow();
+ }
+ if (workerPool != null) {
+ workerPool.shutdownNow();
+ }
+ resetBufferManager();
+ }
}
- /**
- * {@inheritDoc}
- */
@VisibleForTesting
@Override
- public void testResetReadBufferManager(final int readAheadBlockSize,
- final int thresholdAgeMilliseconds) {
- // TODO: To be implemented
+ public void testResetReadBufferManager(int readAheadBlockSize,
+ int thresholdAgeMilliseconds) {
+ setReadAheadBlockSize(readAheadBlockSize);
+ setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
+ testResetReadBufferManager();
+ }
+
+ @VisibleForTesting
+ public void callTryEvict() {
+ tryEvict();
+ }
+
+ @VisibleForTesting
+ public int getNumBuffers() {
+ return numberOfActiveBuffers.get();
}
- /**
- * {@inheritDoc}
- */
@Override
- public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
- // TODO: To be implemented
+ void resetBufferManager() {
+ setBufferManager(null); // reset the singleton instance
+ setIsConfigured(false);
}
- private final ThreadFactory namedThreadFactory = new ThreadFactory() {
+ private static void setBufferManager(ReadBufferManagerV2 manager) {
+ bufferManager = manager;
+ }
+
+ private static void setIsConfigured(boolean configured) {
+ isConfigured.set(configured);
+ }
+
+ private final ThreadFactory workerThreadFactory = new ThreadFactory() {
private int count = 0;
+
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, "ReadAheadV2-Thread-" + count++);
+ Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++);
+ t.setDaemon(true);
+ return t;
}
};
- @Override
- void resetBufferManager() {
- setBufferManager(null); // reset the singleton instance
+ private void printTraceLog(String message, Object... args) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(message, args);
+ }
}
- private static void setBufferManager(ReadBufferManagerV2 manager) {
- bufferManager = manager;
+ private void printDebugLog(String message, Object... args) {
+ LOGGER.debug(message, args);
+ }
+
+ /**
+ * Get the current memory load of the JVM.
+ * @return the memory load as a double value between 0.0 and 1.0
+ */
+ @VisibleForTesting
+ double getMemoryLoad() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ }
+
+ /**
+ * Get the current CPU load of the system.
+ * @return the CPU load as a double value between 0.0 and 1.0
+ */
+ @VisibleForTesting
+ public double getCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+ OperatingSystemMXBean.class);
+ double cpuLoad = osBean.getSystemCpuLoad();
+ if (cpuLoad < 0) {
+ // If the CPU load is not available, return 0.0
+ return 0.0;
+ }
+ return cpuLoad;
+ }
+
+ @VisibleForTesting
+ synchronized static ReadBufferManagerV2 getInstance() {
+ return bufferManager;
+ }
+
+ @VisibleForTesting
+ public int getMinBufferPoolSize() {
+ return minBufferPoolSize;
+ }
+
+ @VisibleForTesting
+ public int getMaxBufferPoolSize() {
+ return maxBufferPoolSize;
+ }
+
+ @VisibleForTesting
+ public int getCurrentThreadPoolSize() {
+ return workerRefs.size();
+ }
+
+ @VisibleForTesting
+ public int getCpuMonitoringIntervalInMilliSec() {
+ return cpuMonitoringIntervalInMilliSec;
+ }
+
+ @VisibleForTesting
+ public int getMemoryMonitoringIntervalInMilliSec() {
+ return memoryMonitoringIntervalInMilliSec;
+ }
+
+ @VisibleForTesting
+ public ScheduledExecutorService getCpuMonitoringThread() {
+ return cpuMonitorThread;
+ }
+
+ public int getRequiredThreadPoolSize() {
+ return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
+ * (getReadAheadQueue().size()
+ + getInProgressList().size())); // 20% more for buffer
+ }
+
+ private boolean isFreeListEmpty() {
+ LOCK.lock();
+ try {
+ return getFreeList().isEmpty();
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ private Integer popFromFreeList() {
+ LOCK.lock();
+ try {
+ return getFreeList().pop();
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ private void pushToFreeList(int idx) {
+ LOCK.lock();
+ try {
+ getFreeList().push(idx);
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ private void incrementActiveBufferCount() {
+ numberOfActiveBuffers.getAndIncrement();
+ }
+
+ private void decrementActiveBufferCount() {
+ numberOfActiveBuffers.getAndDecrement();
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index 79d5eef955a4a..2c6efdc735aeb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@@ -29,6 +31,7 @@ class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
private int id;
private ReadBufferManager bufferManager;
+ private AtomicBoolean isRunning = new AtomicBoolean(true);
ReadBufferWorker(final int id, final ReadBufferManager bufferManager) {
this.id = id;
@@ -54,7 +57,7 @@ public void run() {
Thread.currentThread().interrupt();
}
ReadBuffer buffer;
- while (true) {
+ while (isRunning()) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
} catch (InterruptedException ex) {
@@ -72,7 +75,7 @@ public void run() {
// read-ahead buffer size, make sure a valid length is passed
// for remote read
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length),
- buffer.getTracingContext());
+ buffer.getTracingContext());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (IOException ex) {
@@ -85,4 +88,13 @@ public void run() {
}
}
}
+
+ public void stop() {
+ isRunning.set(false);
+ }
+
+ @VisibleForTesting
+ public boolean isRunning() {
+ return isRunning.get();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 9be4998cb8217..512285fb42e73 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -39,6 +39,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
@@ -708,12 +709,38 @@ protected void assertPathDns(Path path) {
.contains(expectedDns);
}
+ /**
+ * Return array of random bytes of the given length.
+ *
+ * @param length length of the byte array
+ * @return byte array
+ */
protected byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}
+ /**
+ * Create a file on the file system with the given file name and content.
+ *
+ * @param fs fileSystem that stores the file
+ * @param fileName name of the file
+ * @param fileContent content of the file
+ *
+ * @return path of the file created
+ * @throws IOException exception in writing file on fileSystem
+ */
+ protected Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
/**
* Checks a list of futures for exceptions.
*
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index b55032c5132a5..13dd776f3498f 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -234,7 +234,7 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception {
.setDisableOutputStreamFlush(disableOutputStreamFlush);
final Path testFilePath = path(methodName.getMethodName());
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
// The test case must write "fs.azure.write.request.size" bytes
// to the stream in order for the data to be uploaded to storage.
assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize()
@@ -266,7 +266,7 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception {
@Test
public void testHflushWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
String fileName = UUID.randomUUID().toString();
final Path testFilePath = path(fileName);
@@ -279,7 +279,7 @@ public void testHflushWithFlushEnabled() throws Exception {
@Test
public void testHflushWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
@@ -296,7 +296,7 @@ public void testHflushWithFlushDisabled() throws Exception {
@Test
public void testHsyncWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
@@ -333,7 +333,7 @@ public void testTracingHeaderForAppendBlob() throws Exception {
@Test
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
@@ -350,7 +350,7 @@ public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
@Test
public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
assertHasStreamCapabilities(stream,
@@ -366,7 +366,7 @@ public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
@Test
public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
@@ -379,12 +379,6 @@ public void testHsyncWithFlushDisabled() throws Exception {
}
}
- private byte[] getRandomBytesArray() {
- final byte[] b = new byte[TEST_FILE_LENGTH];
- new Random().nextBytes(b);
- return b;
- }
-
private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException {
fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
FSDataOutputStream stream = fs.create(path);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
index 388e662115ed2..ec249e2b040e7 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
@@ -19,23 +19,17 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
-import java.util.Random;
-import java.util.UUID;
import org.assertj.core.api.Assertions;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
-import static org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;
@@ -49,29 +43,6 @@ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest abstractAbfsIntegrat
this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
}
- private Path path(String filepath) throws IOException {
- return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
- new Path(getTestPath(), getUniquePath(filepath)));
- }
-
- private Path getTestPath() {
- Path path = new Path(UriUtils.generateUniqueTestPath());
- return path;
- }
-
- /**
- * Generate a unique path using the given filepath.
- * @param filepath path string
- * @return unique path created from filepath and a GUID
- */
- private Path getUniquePath(String filepath) {
- if (filepath.equals("/")) {
- return new Path(filepath);
- }
- return new Path(filepath + StringUtils
- .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
- }
-
/**
* Returns AzureBlobFileSystem instance with the required
* readFullFileOptimization configuration.
@@ -90,38 +61,6 @@ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
}
- /**
- * Return array of random bytes of the given length.
- *
- * @param length length of the byte array
- * @return byte array
- */
- public byte[] getRandomBytesArray(int length) {
- final byte[] b = new byte[length];
- new Random().nextBytes(b);
- return b;
- }
-
- /**
- * Create a file on the file system with the given file name and content.
- *
- * @param fs fileSystem that stores the file
- * @param fileName name of the file
- * @param fileContent content of the file
- *
- * @return path of the file created
- * @throws IOException exception in writing file on fileSystem
- */
- public Path createFileWithContent(FileSystem fs, String fileName,
- byte[] fileContent) throws IOException {
- Path testFilePath = path(fileName);
- try (FSDataOutputStream oStream = fs.create(testFilePath)) {
- oStream.write(fileContent);
- oStream.flush();
- }
- return testFilePath;
- }
-
/**
* Assert that the content read from the subsection of a file is correct.
*
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
index d14ac05d5f5aa..a143599d0199d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -52,8 +52,8 @@ public void testWithNoOptimization() throws Exception {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
String fileName = methodName.getMethodName() + i;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
}
}
@@ -97,8 +97,8 @@ public void testExceptionInOptimization() throws Exception {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
String fileName = methodName.getMethodName() + i;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
fileSize / 4, fileContent);
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
index c7c9da94ab2ed..d15eb180db87e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
@@ -260,8 +260,8 @@ private void validateSeekAndReadWithConf(boolean optimizeFooterRead,
try (AzureBlobFileSystem spiedFs = createSpiedFs(
getRawConfiguration())) {
String fileName = methodName.getMethodName() + fileId;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(spiedFs, fileName,
fileContent);
for (int readBufferSize : READ_BUFFER_SIZE) {
validateSeekAndReadWithConf(spiedFs, optimizeFooterRead, seekTo,
@@ -391,8 +391,8 @@ public void testPartialReadWithNoData() throws Exception {
futureList.add(executorService.submit(() -> {
try (AzureBlobFileSystem spiedFs = createSpiedFs(
getRawConfiguration())) {
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(spiedFs, fileName,
fileContent);
validatePartialReadWithNoData(spiedFs, fileSize, fileContent,
testFilePath);
@@ -463,8 +463,8 @@ public void testPartialReadWithSomeData() throws Exception {
try (AzureBlobFileSystem spiedFs = createSpiedFs(
getRawConfiguration())) {
String fileName = methodName.getMethodName() + fileId;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(spiedFs, fileName,
fileContent);
validatePartialReadWithSomeData(spiedFs, fileSize, testFilePath,
fileContent);
@@ -585,8 +585,8 @@ private void verifyConfigValueInStream(final FSDataInputStream inputStream,
private Path createPathAndFileWithContent(final AzureBlobFileSystem fs,
final int fileIdx, final int fileSize) throws Exception {
String fileName = methodName.getMethodName() + fileIdx;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- return abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ return createFileWithContent(fs, fileName, fileContent);
}
private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path,
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
index 64fac9ca94ed8..01a3387567a3c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
@@ -74,8 +74,8 @@ private void validateNumBackendCalls(final boolean readSmallFilesCompletely,
for (int i = 1; i <= 4; i++) {
String fileName = methodName.getMethodName() + i;
int fileSize = i * ONE_MB;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = ONE_KB;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
byte[] buffer = new byte[length];
@@ -185,8 +185,8 @@ private void validateSeekAndReadWithConf(final SeekTo seekTo,
for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
String fileName = methodName.getMethodName() + i;
int fileSize = i * ONE_MB;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = ONE_KB;
int seekPos = seekPos(seekTo, fileSize, length);
seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
@@ -255,9 +255,9 @@ public void testPartialReadWithNoData() throws Exception {
try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem(
true)) {
String fileName = methodName.getMethodName() + i;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(
+ byte[] fileContent = getRandomBytesArray(
fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
+ Path testFilePath = createFileWithContent(fs,
fileName, fileContent);
partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
fileContent);
@@ -305,9 +305,9 @@ public void testPartialReadWithSomeData() throws Exception {
try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem(
true)) {
String fileName = methodName.getMethodName() + i;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(
+ byte[] fileContent = getRandomBytesArray(
fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
+ Path testFilePath = createFileWithContent(fs,
fileName, fileContent);
partialReadWithSomeData(fs, testFilePath, fileSize / 2,
fileSize / 4, fileContent);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index b70f36de31867..fd51fc7c420a1 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -18,10 +18,8 @@
package org.apache.hadoop.fs.azurebfs.services;
-import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -29,7 +27,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
@@ -62,127 +59,122 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
*/
public static final int PROBE_INTERVAL_MILLIS = 1_000;
- public ITestReadBufferManager() throws Exception {
+ public ITestReadBufferManager() throws Exception {
+ }
+
+ @Test
+ public void testPurgeBufferManagerForParallelStreams() throws Exception {
+ describe("Testing purging of buffers from ReadBufferManagerV1 for "
+ + "parallel input streams");
+ final int numBuffers = 16;
+ final LinkedList freeList = new LinkedList<>();
+ for (int i=0; i < numBuffers; i++) {
+ freeList.add(i);
}
-
- @Test
- public void testPurgeBufferManagerForParallelStreams() throws Exception {
- describe("Testing purging of buffers from ReadBufferManagerV1 for "
- + "parallel input streams");
- final int numBuffers = 16;
- final LinkedList freeList = new LinkedList<>();
- for (int i=0; i < numBuffers; i++) {
- freeList.add(i);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(4);
- AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
- // verify that the fs has the capability to validate the fix
- Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD))
- .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
- .isTrue();
-
- try {
- for (int i = 0; i < 4; i++) {
- final String fileName = methodName.getMethodName() + i;
- executorService.submit((Callable) () -> {
- byte[] fileContent = getRandomBytesArray(ONE_MB);
- Path testFilePath = createFileWithContent(fs, fileName, fileContent);
- try (FSDataInputStream iStream = fs.open(testFilePath)) {
- iStream.read();
- }
- return null;
- });
- }
- } finally {
- executorService.shutdown();
- // wait for all tasks to finish
- executorService.awaitTermination(1, TimeUnit.MINUTES);
- }
-
- ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager();
- // readahead queue is empty
- assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
- // verify the in progress list eventually empties out.
- eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
- assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ // verify that the fs has the capability to validate the fix
+ Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD))
+ .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
+ .isTrue();
+
+ try {
+ for (int i = 0; i < 4; i++) {
+ final String fileName = methodName.getMethodName() + i;
+ executorService.submit((Callable) () -> {
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ iStream.read();
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
}
- private void assertListEmpty(String listName, List list) {
- Assertions.assertThat(list)
- .describedAs("After closing all streams %s should be empty", listName)
- .hasSize(0);
+ ReadBufferManager bufferManager = getBufferManager(fs);
+ // readahead queue is empty
+ assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+ // verify the in progress list eventually empties out.
+ eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
+ assertListEmpty("InProgressList", bufferManager.getInProgressListCopy()));
+ }
+
+ private void assertListEmpty(String listName, List list) {
+ Assertions.assertThat(list)
+ .describedAs("After closing all streams %s should be empty", listName)
+ .hasSize(0);
+ }
+
+ @Test
+ public void testPurgeBufferManagerForSequentialStream() throws Exception {
+ describe("Testing purging of buffers in ReadBufferManagerV1 for "
+ + "sequential input streams");
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ final String fileName = methodName.getMethodName();
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+ AbfsInputStream iStream1 = null;
+ // stream1 will be closed right away.
+ try {
+ iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+ // Just reading one byte will trigger all read ahead calls.
+ iStream1.read();
+ } finally {
+ IOUtils.closeStream(iStream1);
}
-
- @Test
- public void testPurgeBufferManagerForSequentialStream() throws Exception {
- describe("Testing purging of buffers in ReadBufferManagerV1 for "
- + "sequential input streams");
- AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
- final String fileName = methodName.getMethodName();
- byte[] fileContent = getRandomBytesArray(ONE_MB);
- Path testFilePath = createFileWithContent(fs, fileName, fileContent);
-
- AbfsInputStream iStream1 = null;
- // stream1 will be closed right away.
- try {
- iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
- // Just reading one byte will trigger all read ahead calls.
- iStream1.read();
- } finally {
- IOUtils.closeStream(iStream1);
- }
- ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager();
- AbfsInputStream iStream2 = null;
- try {
- iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
- iStream2.read();
- // After closing stream1, no queued buffers of stream1 should be present
- // assertions can't be made about the state of the other lists as it is
- // too prone to race conditions.
- assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
- } finally {
- // closing the stream later.
- IOUtils.closeStream(iStream2);
- }
- // After closing stream2, no queued buffers of stream2 should be present.
- assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
-
- // After closing both the streams, read queue should be empty.
- assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
-
+ ReadBufferManager bufferManager = getBufferManager(fs);
+ AbfsInputStream iStream2 = null;
+ try {
+ iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+ iStream2.read();
+ // After closing stream1, no queued buffers of stream1 should be present
+ // assertions can't be made about the state of the other lists as it is
+ // too prone to race conditions.
+ assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
+ } finally {
+ // closing the stream later.
+ IOUtils.closeStream(iStream2);
}
+ // After closing stream2, no queued buffers of stream2 should be present.
+ assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
+ // After closing both the streams, read queue should be empty.
+ assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
- private void assertListDoesnotContainBuffersForIstream(List list,
- AbfsInputStream inputStream) {
- for (ReadBuffer buffer : list) {
- Assertions.assertThat(buffer.getStream())
- .describedAs("Buffers associated with closed input streams shouldn't be present")
- .isNotEqualTo(inputStream);
- }
- }
+ }
- private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
- Configuration conf = getRawConfiguration();
- conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
- conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
- conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
- return (AzureBlobFileSystem) FileSystem.newInstance(conf);
- }
- protected byte[] getRandomBytesArray(int length) {
- final byte[] b = new byte[length];
- new Random().nextBytes(b);
- return b;
+ private void assertListDoesnotContainBuffersForIstream(List list,
+ AbfsInputStream inputStream) {
+ for (ReadBuffer buffer : list) {
+ Assertions.assertThat(buffer.getStream())
+ .describedAs("Buffers associated with closed input streams shouldn't be present")
+ .isNotEqualTo(inputStream);
}
-
- protected Path createFileWithContent(FileSystem fs, String fileName,
- byte[] fileContent) throws IOException {
- Path testFilePath = path(fileName);
- try (FSDataOutputStream oStream = fs.create(testFilePath)) {
- oStream.write(fileContent);
- oStream.flush();
- }
- return testFilePath;
+ }
+
+ private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
+ conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
+ conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) {
+ int blockSize = fs.getAbfsStore().getAbfsConfiguration().getReadAheadBlockSize();
+ if (getConfiguration().isReadAheadV2Enabled()) {
+ ReadBufferManagerV2.setReadBufferManagerConfigs(blockSize,
+ getConfiguration());
+ return ReadBufferManagerV2.getBufferManager();
}
+ ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize);
+ return ReadBufferManagerV1.getBufferManager();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
new file mode 100644
index 0000000000000..780a7b0f2a2a1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+
+ private static final int LESS_NUM_FILES = 2;
+ private static final int MORE_NUM_FILES = 5;
+ private static final int SMALL_FILE_SIZE = 6 * ONE_MB;
+ private static final int LARGE_FILE_SIZE = 50 * ONE_MB;
+ private static final int BLOCK_SIZE = 4 * ONE_MB;
+
+ public ITestReadBufferManagerV2() throws Exception {
+ }
+
+ @Test
+ public void testReadDifferentFilesInParallel() throws Exception {
+ try (AzureBlobFileSystem fs = getConfiguredFileSystem()) {
+ int fileSize = LARGE_FILE_SIZE;
+ int numFiles = MORE_NUM_FILES;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+
+ Path[] testPaths = new Path[numFiles];
+ int[] idx = {0};
+ for (int i = 0; i < numFiles; i++) {
+ final String fileName = methodName.getMethodName() + i;
+ testPaths[i] = createFileWithContent(fs, fileName, fileContent);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(numFiles);
+ Map metricMap = getInstrumentationMap(fs);
+ long requestsMadeBeforeTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ executorService.submit((Callable) () -> {
+ try (FSDataInputStream iStream = fs.open(testPaths[idx[0]++])) {
+ byte[] buffer = new byte[fileSize];
+ int bytesRead = iStream.read(buffer, 0, fileSize);
+ Assertions.assertThat(bytesRead).isEqualTo(fileSize);
+ Assertions.assertThat(buffer).isEqualTo(fileContent);
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ metricMap = getInstrumentationMap(fs);
+ long requestsMadeAfterTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ int expectedRequests = numFiles // Get Path Status for each file
+ + ((int) Math.ceil((double) fileSize / BLOCK_SIZE))
+ * numFiles; // Read requests for each file
+ assertEquals(expectedRequests,
+ requestsMadeAfterTest - requestsMadeBeforeTest);
+ }
+ }
+
+ @Test
+ public void testReadSameFileInParallel() throws Exception {
+ try (AzureBlobFileSystem fs = getConfiguredFileSystem()) {
+ int fileSize = SMALL_FILE_SIZE;
+ int numFiles = LESS_NUM_FILES;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+
+ final String fileName = methodName.getMethodName();
+ Path testPath = createFileWithContent(fs, fileName, fileContent);
+ ExecutorService executorService = Executors.newFixedThreadPool(numFiles);
+ Map metricMap = getInstrumentationMap(fs);
+ long requestsMadeBeforeTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ executorService.submit((Callable) () -> {
+ try (FSDataInputStream iStream = fs.open(testPath)) {
+ byte[] buffer = new byte[fileSize];
+ int bytesRead = iStream.read(buffer, 0, fileSize);
+ Assertions.assertThat(bytesRead).isEqualTo(fileSize);
+ Assertions.assertThat(buffer).isEqualTo(fileContent);
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ metricMap = getInstrumentationMap(fs);
+ long requestsMadeAfterTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ int expectedRequests = numFiles // Get Path Status for each file
+ + ((int) Math.ceil(
+ (double) fileSize / BLOCK_SIZE)); // Read requests for each file
+ assertEquals(expectedRequests,
+ requestsMadeAfterTest - requestsMadeBeforeTest);
+ }
+ }
+
+ private AzureBlobFileSystem getConfiguredFileSystem() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ config.set(FS_AZURE_ENABLE_READAHEAD_V2, TRUE);
+ config.set(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, TRUE);
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(config);
+ return fs;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index e64178f0a52ac..7290ad0238548 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -110,7 +110,7 @@ public void teardown() throws Exception {
getBufferManager().testResetReadBufferManager();
}
- private AbfsRestOperation getMockRestOp() {
+ AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
when(httpOp.getBytesReceived()).thenReturn(1024L);
@@ -119,7 +119,7 @@ private AbfsRestOperation getMockRestOp() {
return op;
}
- private AbfsClient getMockAbfsClient() throws URISyntaxException {
+ AbfsClient getMockAbfsClient() throws URISyntaxException {
// Mock failure for client.read()
AbfsClient client = mock(AbfsClient.class);
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
@@ -133,7 +133,7 @@ private AbfsClient getMockAbfsClient() throws URISyntaxException {
return client;
}
- private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
+ AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
@@ -142,7 +142,10 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
null,
FORWARD_SLASH + fileName,
THREE_KB,
- inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
+ inputStreamContext.withReadBufferSize(ONE_KB)
+ .withReadAheadQueueDepth(10)
+ .withReadAheadBlockSize(ONE_KB)
+ .isReadAheadV2Enabled(getConfiguration().isReadAheadV2Enabled()),
"eTag",
getTestTracingContext(null, false));
@@ -180,7 +183,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
return inputStream;
}
- private void queueReadAheads(AbfsInputStream inputStream) {
+ void queueReadAheads(AbfsInputStream inputStream) {
// Mimic AbfsInputStream readAhead queue requests
getBufferManager()
.queueReadAhead(inputStream, 0, ONE_KB, inputStream.getTracingContext());
@@ -564,7 +567,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
Thread.sleep(readBufferTransferToInProgressProbableTime);
- Assertions.assertThat(readBufferManager.getInProgressCopiedList())
+ Assertions.assertThat(readBufferManager.getInProgressListCopy())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
@@ -577,7 +580,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
.hasSize(0);
}
- Assertions.assertThat(readBufferManager.getInProgressCopiedList())
+ Assertions.assertThat(readBufferManager.getInProgressListCopy())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
@@ -824,6 +827,7 @@ public void testReadTypeInTracingContextHeader() throws Exception {
*/
fileSize = 3 * ONE_MB; // To make sure multiple blocks are read.
totalReadCalls += 3; // 3 blocks of 1MB each.
+ doReturn(false).when(spiedConfig).isReadAheadV2Enabled();
doReturn(false).when(spiedConfig).isReadAheadEnabled();
testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, NORMAL_READ, 3, totalReadCalls);
@@ -1124,6 +1128,11 @@ private void resetReadBufferManager(int bufferSize, int threshold) {
}
private ReadBufferManager getBufferManager() {
+ if (getConfiguration().isReadAheadV2Enabled()) {
+ ReadBufferManagerV2.setReadBufferManagerConfigs(
+ getConfiguration().getReadAheadBlockSize(), getConfiguration());
+ return ReadBufferManagerV2.getBufferManager();
+ }
return ReadBufferManagerV1.getBufferManager();
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
new file mode 100644
index 0000000000000..b8551e42cb218
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+ private volatile boolean running = true;
+ private final List allocations = new ArrayList<>();
+ private static final double HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 0.8;
+
+ public TestReadBufferManagerV2() throws Exception {
+ super();
+ }
+
+ /**
+ * Test to verify init of ReadBufferManagerV2
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testReadBufferManagerV2Init() throws Exception {
+ ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ Assertions.assertThat(ReadBufferManagerV2.getInstance())
+ .as("ReadBufferManager should be uninitialized").isNull();
+ intercept(IllegalStateException.class, "ReadBufferManagerV2 is not configured.", () -> {
+ ReadBufferManagerV2.getBufferManager();
+ });
+ // verify that multiple invocations of getBufferManager returns same instance.
+ ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
+ ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+ Assertions.assertThat(bufferManager).isNotNull();
+ Assertions.assertThat(bufferManager2).isNotNull();
+ Assertions.assertThat(bufferManager).isSameAs(bufferManager2);
+ Assertions.assertThat(bufferManager3).isNotNull();
+ Assertions.assertThat(bufferManager3).isSameAs(bufferManager);
+
+ // Verify default values are not invalid.
+ Assertions.assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+ Assertions.assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+ }
+
+ /**
+ * Test to verify that cpu monitor thread is not active if disabled.
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+ try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ Assertions.assertThat(bufferManagerV2.getCpuMonitoringThread())
+ .as("CPU Monitor thread should be initialized").isNotNull();
+ bufferManagerV2.resetBufferManager();
+ }
+
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+ try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ Assertions.assertThat(bufferManagerV2.getCpuMonitoringThread())
+ .as("CPU Monitor thread should not be initialized").isNull();
+ bufferManagerV2.resetBufferManager();
+ }
+ }
+
+ @Test
+ public void testThreadPoolDynamicScaling() throws Exception {
+ running = true;
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ Assertions.assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+ int[] reqOffset = {0};
+ int reqLength = 1;
+ Thread t = new Thread(() -> {
+ while (running) {
+ bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+ inputStream.getTracingContext());
+ reqOffset[0] += reqLength;
+ }
+ });
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+ Assertions.assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThan(2);
+ running = false;
+ t.join();
+ Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+ Assertions.assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
+ }
+
+ @Test
+ public void testCpuUpscaleNotAllowedIfCpuAboveThreshold() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, "0"); // set low threshold
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ Assertions.assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+ int[] reqOffset = {0};
+ int reqLength = 1;
+ running = true;
+ Thread t = new Thread(() -> {
+ while (running) {
+ bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+ inputStream.getTracingContext());
+ reqOffset[0] += reqLength;
+ }
+ });
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+ Assertions.assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+ running = false;
+ t.join();
+ }
+
+ @Test
+ public void testScheduledEviction() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ // Add a failed buffer to completed queue and set to no free buffers to read ahead.
+ ReadBuffer buff = new ReadBuffer();
+ buff.setStatus(ReadBufferStatus.READ_FAILED);
+ buff.setStream(inputStream);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+ Assertions.assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(2);
+ Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+ Assertions.assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(0);
+ }
+
+ @Test
+ public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "0"); // set low threshold
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ // Add a failed buffer to completed queue and set to no free buffers to read ahead.
+ ReadBuffer buff = new ReadBuffer();
+ buff.setStatus(ReadBufferStatus.READ_FAILED);
+ buff.setStream(inputStream);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+ Assertions.assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+ bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+ inputStream.getTracingContext());
+ Assertions.assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+ }
+
+ @Test
+ public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "100");
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ // Add a failed buffer to completed queue and set to no free buffers to read ahead.
+ ReadBuffer buff = new ReadBuffer();
+ buff.setStatus(ReadBufferStatus.READ_FAILED);
+ buff.setStream(inputStream);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+ Assertions.assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+ bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+ inputStream.getTracingContext());
+ Assertions.assertThat(bufferManagerV2.getNumBuffers()).isGreaterThan(bufferManagerV2.getMinBufferPoolSize());
+ }
+
+ @Test
+ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "2");
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
+ Assertions.assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+ running = true;
+ Thread t = new Thread(() -> {
+ while (running) {
+ long maxMemory = Runtime.getRuntime().maxMemory();
+ long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ double usage = (double) usedMemory / maxMemory;
+
+ if (usage < HIGH_MEMORY_USAGE_THRESHOLD_PERCENT) {
+ // Allocate more memory
+ allocations.add(new byte[10 * 1024 * 1024]); // 10MB
+ }
+ }
+ }, "MemoryLoadThread");
+ t.setDaemon(true);
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+ Assertions.assertThat(bufferManagerV2.getNumBuffers()).isLessThan(initialBuffers);
+ running = false;
+ t.join();
+ }
+
+ private Configuration getReadAheadV2Configuration() {
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+ conf.setInt(FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, 2);
+ conf.setInt(FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE, 4);
+ conf.setInt(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, HUNDRED);
+ conf.setInt(FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
+ conf.setInt(FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS, 1_000);
+ conf.setInt(FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS, 1_000);
+ return conf;
+ }
+}