From 97735de56df81bec9bb1f14994c6e7ad2346c936 Mon Sep 17 00:00:00 2001 From: sumangala-patki <70206833+sumangala-patki@users.noreply.github.com> Date: Wed, 18 Aug 2021 19:14:10 +0530 Subject: [PATCH 1/5] HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via OpenFileParameters (#2975) --- .../fs/azurebfs/AzureBlobFileSystem.java | 20 ++- .../fs/azurebfs/AzureBlobFileSystemStore.java | 71 +++++++---- .../azurebfs/AbstractAbfsIntegrationTest.java | 10 ++ .../services/TestAbfsInputStream.java | 115 +++++++++++++++++- 4 files changed, 183 insertions(+), 33 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 82e1072db99b3..46141e7c4a838 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -262,16 +262,15 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx } private FSDataInputStream open(final Path path, - final Optional options) throws IOException { + final Optional parameters) throws IOException { statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); try { TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, - listener); - InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, - options, statistics, tracingContext); + fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener); + InputStream inputStream = abfsStore + .openFileForRead(qualifiedPath, parameters, statistics, tracingContext); return new FSDataInputStream(inputStream); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); @@ -279,6 +278,15 @@ private FSDataInputStream open(final Path path, } } + /** + * Takes config and other options through + * {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that + * FileStatus entered is up-to-date, as it will be used to create the + * InputStream (with info such as contentLength, eTag) + * @param path The location of file to be opened + * @param parameters OpenFileParameters instance; can hold FileStatus, + * Configuration, bufferSize and mandatoryKeys + */ @Override protected CompletableFuture openFileWithOptions( final Path path, final OpenFileParameters parameters) throws IOException { @@ -289,7 +297,7 @@ protected CompletableFuture openFileWithOptions( "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> - open(path, Optional.of(parameters.getOptions()))); + open(path, Optional.of(parameters))); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 95035a1e45c87..74084a99defd8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -119,6 +119,7 @@ import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -137,6 +138,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; @@ -733,44 +736,64 @@ public void createDirectory(final Path path, final FsPermission permission, public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics, TracingContext tracingContext) - throws AzureBlobFileSystemException { - return openFileForRead(path, Optional.empty(), statistics, tracingContext); + throws IOException { + return openFileForRead(path, Optional.empty(), statistics, + tracingContext); } - public AbfsInputStream openFileForRead(final Path path, - final Optional options, + public AbfsInputStream openFileForRead(Path path, + final Optional parameters, final FileSystem.Statistics statistics, TracingContext tracingContext) - throws AzureBlobFileSystemException { - try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { + throws IOException { + try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", + "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", - client.getFileSystem(), - path); + client.getFileSystem(), path); + FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) + .orElse(null); String relativePath = getRelativePath(path); - - final AbfsRestOperation op = client - .getPathStatus(relativePath, false, tracingContext); - perfInfo.registerResult(op.getResult()); - - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + String resourceType, eTag; + long contentLength; + if (fileStatus instanceof VersionedFileStatus) { + path = path.makeQualified(this.uri, path); + Preconditions.checkArgument(fileStatus.getPath().equals(path), + String.format( + "Filestatus path [%s] does not match with given path [%s]", + fileStatus.getPath(), path)); + resourceType = fileStatus.isFile() ? FILE : DIRECTORY; + contentLength = fileStatus.getLen(); + eTag = ((VersionedFileStatus) fileStatus).getVersion(); + } else { + if (fileStatus != null) { + LOG.warn( + "Fallback to getPathStatus REST call as provided filestatus " + + "is not of type VersionedFileStatus"); + } + AbfsHttpOperation op = client.getPathStatus(relativePath, false, + tracingContext).getResult(); + resourceType = op.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + contentLength = Long.parseLong( + op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); + } if (parseIsDirectory(resourceType)) { throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); } perfInfo.registerSuccess(true); // Add statistics for InputStream - return new AbfsInputStream(client, statistics, - relativePath, contentLength, - populateAbfsInputStreamContext(options), - eTag, tracingContext); + return new AbfsInputStream(client, statistics, relativePath, + contentLength, populateAbfsInputStreamContext( + parameters.map(OpenFileParameters::getOptions)), + eTag, tracingContext); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 3bc83385d1e4c..4a5507526c3a1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; @@ -430,6 +431,15 @@ public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) { return fs.getAbfsStore(); } + public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) { + return abfsStore.getClient(); + } + + public void setAbfsClient(AzureBlobFileSystemStore abfsStore, + AbfsClient client) { + abfsStore.setClient(client); + } + public Path makeQualified(Path path) throws java.io.IOException { return getFileSystem().makeQualified(path); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 62326e0dbb353..b5ae9b737842d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,31 +19,40 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; - -import org.junit.Assert; -import org.junit.Test; import java.util.Arrays; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -192,6 +201,106 @@ public TestAbfsInputStream() throws Exception { ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } + private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(testFile); + FSDataOutputStream out = fs.append(testFile); + out.write(buffer); + out.close(); + } + + private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, + byte[] buf, AbfsRestOperationType source) + throws IOException, ExecutionException, InterruptedException { + byte[] readBuf = new byte[buf.length]; + AzureBlobFileSystem fs = getFileSystem(); + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.withFileStatus(fileStatus); + FSDataInputStream in = builder.build().get(); + assertEquals(String.format( + "Open with fileStatus [from %s result]: Incorrect number of bytes read", + source), buf.length, in.read(readBuf)); + assertArrayEquals(String + .format("Open with fileStatus [from %s result]: Incorrect read data", + source), readBuf, buf); + } + + private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, + AbfsRestOperationType source, TracingContext tracingContext) + throws IOException { + + // verify GetPathStatus not invoked when FileStatus is provided + abfsStore.openFileForRead(testFile, Optional + .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext); + verify(mockClient, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", + source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + // verify GetPathStatus invoked when FileStatus not provided + abfsStore.openFileForRead(testFile, + Optional.empty(), null, + tracingContext); + verify(mockClient, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + Mockito.reset(mockClient); //clears invocation count for next test case + } + + @Test + public void testOpenFileWithOptions() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String testFolder = "/testFolder"; + Path smallTestFile = new Path(testFolder + "/testFile0"); + Path largeTestFile = new Path(testFolder + "/testFile1"); + fs.mkdirs(new Path(testFolder)); + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] smallBuffer = new byte[5]; + byte[] largeBuffer = new byte[readBufferSize + 5]; + new Random().nextBytes(smallBuffer); + new Random().nextBytes(largeBuffer); + writeBufferToNewFile(smallTestFile, smallBuffer); + writeBufferToNewFile(largeTestFile, largeBuffer); + + FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile), + fs.getFileStatus(largeTestFile)}; + FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder)); + + // open with fileStatus from GetPathStatus + verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], + smallBuffer, AbfsRestOperationType.GetPathStatus); + verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], + largeBuffer, AbfsRestOperationType.GetPathStatus); + + // open with fileStatus from ListStatus + verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, + AbfsRestOperationType.ListPaths); + verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, + AbfsRestOperationType.ListPaths); + + // verify number of GetPathStatus invocations + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getAbfsClient(abfsStore)); + setAbfsClient(abfsStore, mockClient); + TracingContext tracingContext = getTestTracingContext(fs, false); + checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(smallTestFile, listStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + checkGetPathStatusCalls(largeTestFile, listStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + + // Verify with incorrect filestatus + getFileStatusResults[0].setPath(new Path("wrongPath")); + intercept(ExecutionException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, + getFileStatusResults[0], smallBuffer, + AbfsRestOperationType.GetPathStatus)); + } + /** * This test expects AbfsInputStream to throw the exception that readAhead * thread received on read. The readAhead thread must be initiated from the From 5433d568c3671717cf05d20a6b7332d0c5215799 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sun, 24 Apr 2022 17:03:59 +0100 Subject: [PATCH 2/5] HADOOP-16202. Enhanced openFile(): hadoop-common changes. (#2584/1) This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1 --- .../org/apache/hadoop/fs/AvroFSInput.java | 11 +- .../apache/hadoop/fs/ChecksumFileSystem.java | 4 +- .../java/org/apache/hadoop/fs/FSBuilder.java | 14 + .../org/apache/hadoop/fs/FileContext.java | 18 +- .../java/org/apache/hadoop/fs/FileSystem.java | 13 +- .../java/org/apache/hadoop/fs/FileUtil.java | 61 +- .../fs/FutureDataInputStreamBuilder.java | 8 +- .../java/org/apache/hadoop/fs/Options.java | 119 ++++ .../hadoop/fs/impl/AbstractFSBuilderImpl.java | 38 +- .../fs/impl/FileSystemMultipartUploader.java | 9 +- .../FutureDataInputStreamBuilderImpl.java | 8 +- .../hadoop/fs/impl/FutureIOSupport.java | 83 +-- .../hadoop/fs/impl/OpenFileParameters.java | 13 + .../hadoop/fs/impl/WrappedIOException.java | 11 +- .../fs/shell/CommandWithDestination.java | 9 +- .../apache/hadoop/fs/shell/CopyCommands.java | 3 +- .../org/apache/hadoop/fs/shell/Display.java | 3 +- .../java/org/apache/hadoop/fs/shell/Head.java | 8 +- .../org/apache/hadoop/fs/shell/PathData.java | 35 ++ .../java/org/apache/hadoop/fs/shell/Tail.java | 11 +- .../fs/statistics/StoreStatisticNames.java | 9 + .../fs/statistics/StreamStatisticNames.java | 19 +- .../statistics/impl/IOStatisticsBinding.java | 44 +- .../org/apache/hadoop/io/SequenceFile.java | 14 +- .../apache/hadoop/util/JsonSerialization.java | 6 +- .../functional/CommonCallableSupplier.java | 2 +- .../hadoop/util/functional/FutureIO.java | 90 +++ .../site/markdown/filesystem/filesystem.md | 90 +-- .../filesystem/fsdatainputstreambuilder.md | 588 +++++++++++++++++- .../src/site/markdown/filesystem/index.md | 1 + .../src/site/markdown/filesystem/openfile.md | 122 ++++ ...AbstractContractMultipartUploaderTest.java | 2 +- .../fs/contract/AbstractContractOpenTest.java | 80 ++- .../hadoop/fs/contract/ContractTestUtils.java | 13 +- .../fs/statistics/IOStatisticAssertions.java | 20 + .../fs/statistics/TestDurationTracking.java | 3 +- 36 files changed, 1321 insertions(+), 261 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java index b4a4a85674dfa..213fbc24c4db0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java @@ -25,6 +25,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -42,7 +46,12 @@ public AvroFSInput(final FSDataInputStream in, final long len) { public AvroFSInput(final FileContext fc, final Path p) throws IOException { FileStatus status = fc.getFileStatus(p); this.len = status.getLen(); - this.stream = fc.open(p); + this.stream = awaitFuture(fc.openFile(p) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) + .withFileStatus(status) + .build()); + fc.open(p); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 0256a58f46368..89a399ec32a92 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -24,7 +24,6 @@ import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.util.Arrays; -import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -45,6 +44,7 @@ import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.Progressable; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; @@ -889,7 +889,7 @@ protected CompletableFuture openFileWithOptions( final OpenFileParameters parameters) throws IOException { AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), + FS_OPTION_OPENFILE_STANDARD_OPTIONS, "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java index b7757a62e28ad..a4c7254cfeb3c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java @@ -61,6 +61,13 @@ public interface FSBuilder> { */ B opt(@Nonnull String key, float value); + /** + * Set optional long parameter for the Builder. + * + * @see #opt(String, String) + */ + B opt(@Nonnull String key, long value); + /** * Set optional double parameter for the Builder. * @@ -104,6 +111,13 @@ public interface FSBuilder> { */ B must(@Nonnull String key, float value); + /** + * Set mandatory long option. + * + * @see #must(String, String) + */ + B must(@Nonnull String key, long value); + /** * Set mandatory double option. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index c7c1428b4486e..ff4debeaf196a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -71,7 +71,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** * The FileContext class provides an interface for users of the Hadoop @@ -2204,7 +2209,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource, EnumSet createFlag = overwrite ? EnumSet.of( CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE); - InputStream in = open(qSrc); + InputStream in = awaitFuture(openFile(qSrc) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .opt(FS_OPTION_OPENFILE_LENGTH, + fs.getLen()) // file length hint for object stores + .build()); try (OutputStream out = create(qDst, createFlag)) { IOUtils.copyBytes(in, out, conf, true); } finally { @@ -2936,9 +2946,11 @@ public CompletableFuture build() throws IOException { final Path absF = fixRelativePart(getPath()); OpenFileParameters parameters = new OpenFileParameters() .withMandatoryKeys(getMandatoryKeys()) + .withOptionalKeys(getOptionalKeys()) .withOptions(getOptions()) - .withBufferSize(getBufferSize()) - .withStatus(getStatus()); + .withStatus(getStatus()) + .withBufferSize( + getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize())); return new FSLinkResolver>() { @Override public CompletableFuture next( diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index f0c55f5428040..963c3a1acc778 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -91,7 +91,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -4626,7 +4627,7 @@ protected CompletableFuture openFileWithOptions( final OpenFileParameters parameters) throws IOException { AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), + Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> @@ -4654,7 +4655,7 @@ protected CompletableFuture openFileWithOptions( final OpenFileParameters parameters) throws IOException { AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), ""); + Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, ""); CompletableFuture result = new CompletableFuture<>(); try { result.complete(open(pathHandle, parameters.getBufferSize())); @@ -4761,9 +4762,11 @@ public CompletableFuture build() throws IOException { Optional optionalPath = getOptionalPath(); OpenFileParameters parameters = new OpenFileParameters() .withMandatoryKeys(getMandatoryKeys()) + .withOptionalKeys(getOptionalKeys()) .withOptions(getOptions()) - .withBufferSize(getBufferSize()) - .withStatus(super.getStatus()); // explicit to avoid IDE warnings + .withStatus(super.getStatus()) + .withBufferSize( + getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize())); if(optionalPath.isPresent()) { return getFS().openFileWithOptions(optionalPath.get(), parameters); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index b130ba824304e..ccb3ac8ca9045 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -76,6 +76,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** * A collection of file-processing util methods */ @@ -395,7 +400,32 @@ public static boolean copy(FileSystem srcFS, Path src, return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf); } - /** Copy files between FileSystems. */ + /** + * Copy a file/directory tree within/between filesystems. + *

+ * returns true if the operation succeeded. When deleteSource is true, + * this means "after the copy, delete(source) returned true" + * If the destination is a directory, and mkdirs (dest) fails, + * the operation will return false rather than raise any exception. + *

+ * The overwrite flag is about overwriting files; it has no effect about + * handing an attempt to copy a file atop a directory (expect an IOException), + * or a directory over a path which contains a file (mkdir will fail, so + * "false"). + *

+ * The operation is recursive, and the deleteSource operation takes place + * as each subdirectory is copied. Therefore, if an operation fails partway + * through, the source tree may be partially deleted. + * @param srcFS source filesystem + * @param srcStatus status of source + * @param dstFS destination filesystem + * @param dst path of source + * @param deleteSource delete the source? + * @param overwrite overwrite files at destination? + * @param conf configuration to use when opening files + * @return true if the operation succeeded. + * @throws IOException failure + */ public static boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, boolean deleteSource, @@ -408,22 +438,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus, if (!dstFS.mkdirs(dst)) { return false; } - FileStatus contents[] = srcFS.listStatus(src); - for (int i = 0; i < contents.length; i++) { - copy(srcFS, contents[i], dstFS, - new Path(dst, contents[i].getPath().getName()), - deleteSource, overwrite, conf); + RemoteIterator contents = srcFS.listStatusIterator(src); + while (contents.hasNext()) { + FileStatus next = contents.next(); + copy(srcFS, next, dstFS, + new Path(dst, next.getPath().getName()), + deleteSource, overwrite, conf); } } else { - InputStream in=null; + InputStream in = null; OutputStream out = null; try { - in = srcFS.open(src); + in = awaitFuture(srcFS.openFile(src) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .opt(FS_OPTION_OPENFILE_LENGTH, + srcStatus.getLen()) // file length hint for object stores + .build()); out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); } catch (IOException e) { - IOUtils.closeStream(out); - IOUtils.closeStream(in); + IOUtils.cleanupWithLogger(LOG, in, out); throw e; } } @@ -502,7 +537,11 @@ private static boolean copy(FileSystem srcFS, FileStatus srcStatus, deleteSource, conf); } } else { - InputStream in = srcFS.open(src); + InputStream in = awaitFuture(srcFS.openFile(src) + .withFileStatus(srcStatus) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build()); IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf); } if (deleteSource) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java index 27a522e593001..e7f441a75d3c8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -34,7 +35,7 @@ * options accordingly, for example: * * If the option is not related to the file system, the option will be ignored. - * If the option is must, but not supported by the file system, a + * If the option is must, but not supported/known by the file system, an * {@link IllegalArgumentException} will be thrown. * */ @@ -51,10 +52,11 @@ CompletableFuture build() /** * A FileStatus may be provided to the open request. * It is up to the implementation whether to use this or not. - * @param status status. + * @param status status: may be null * @return the builder. */ - default FutureDataInputStreamBuilder withFileStatus(FileStatus status) { + default FutureDataInputStreamBuilder withFileStatus( + @Nullable FileStatus status) { return this; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index 75bc12df8fdcf..9b457272fcb50 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -17,9 +17,13 @@ */ package org.apache.hadoop.fs; +import java.util.Collections; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -518,4 +522,119 @@ public enum ChecksumCombineMode { MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs COMPOSITE_CRC // Block/chunk-independent composite CRC } + + /** + * The standard {@code openFile()} options. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public static final class OpenFileOptions { + + private OpenFileOptions() { + } + + /** + * Prefix for all standard filesystem options: {@value}. + */ + private static final String FILESYSTEM_OPTION = "fs.option."; + + /** + * Prefix for all openFile options: {@value}. + */ + public static final String FS_OPTION_OPENFILE = + FILESYSTEM_OPTION + "openfile."; + + /** + * OpenFile option for file length: {@value}. + */ + public static final String FS_OPTION_OPENFILE_LENGTH = + FS_OPTION_OPENFILE + "length"; + + /** + * OpenFile option for split start: {@value}. + */ + public static final String FS_OPTION_OPENFILE_SPLIT_START = + FS_OPTION_OPENFILE + "split.start"; + + /** + * OpenFile option for split end: {@value}. + */ + public static final String FS_OPTION_OPENFILE_SPLIT_END = + FS_OPTION_OPENFILE + "split.end"; + + /** + * OpenFile option for buffer size: {@value}. + */ + public static final String FS_OPTION_OPENFILE_BUFFER_SIZE = + FS_OPTION_OPENFILE + "buffer.size"; + + /** + * OpenFile option for read policies: {@value}. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY = + FS_OPTION_OPENFILE + "read.policy"; + + /** + * Set of standard options which openFile implementations + * MUST recognize, even if they ignore the actual values. + */ + public static final Set FS_OPTION_OPENFILE_STANDARD_OPTIONS = + Collections.unmodifiableSet(Stream.of( + FS_OPTION_OPENFILE_BUFFER_SIZE, + FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_LENGTH, + FS_OPTION_OPENFILE_SPLIT_START, + FS_OPTION_OPENFILE_SPLIT_END) + .collect(Collectors.toSet())); + + /** + * Read policy for adaptive IO: {@value}. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE = + "adaptive"; + + /** + * Read policy {@value} -whateve the implementation does by default. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT = + "default"; + + /** + * Read policy for random IO: {@value}. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY_RANDOM = + "random"; + + /** + * Read policy for sequential IO: {@value}. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL = + "sequential"; + + /** + * Vectored IO API to be used: {@value}. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY_VECTOR = + "vector"; + + /** + * Whole file to be read, end-to-end: {@value}. + */ + public static final String FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE = + "whole-file"; + + /** + * All the current read policies as a set. + */ + public static final Set FS_OPTION_OPENFILE_READ_POLICIES = + Collections.unmodifiableSet(Stream.of( + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, + FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, + FS_OPTION_OPENFILE_READ_POLICY_RANDOM, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, + FS_OPTION_OPENFILE_READ_POLICY_VECTOR, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .collect(Collectors.toSet())); + + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java index 9cf8b3dc4d203..2308e38a1c9e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java @@ -46,7 +46,7 @@ * * .opt("foofs:option.a", true) * .opt("foofs:option.b", "value") - * .opt("barfs:cache", true) + * .opt("fs.s3a.open.option.etag", "9fe4c37c25b") * .must("foofs:cache", true) * .must("barfs:cache-size", 256 * 1024 * 1024) * .build(); @@ -88,6 +88,9 @@ /** Keep track of the keys for mandatory options. */ private final Set mandatoryKeys = new HashSet<>(); + /** Keep track of the optional keys. */ + private final Set optionalKeys = new HashSet<>(); + /** * Constructor with both optional path and path handle. * Either or both argument may be empty, but it is an error for @@ -163,6 +166,7 @@ public PathHandle getPathHandle() { @Override public B opt(@Nonnull final String key, @Nonnull final String value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.set(key, value); return getThisBuilder(); } @@ -175,6 +179,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) { @Override public B opt(@Nonnull final String key, boolean value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setBoolean(key, value); return getThisBuilder(); } @@ -187,10 +192,19 @@ public B opt(@Nonnull final String key, boolean value) { @Override public B opt(@Nonnull final String key, int value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setInt(key, value); return getThisBuilder(); } + @Override + public B opt(@Nonnull final String key, final long value) { + mandatoryKeys.remove(key); + optionalKeys.add(key); + options.setLong(key, value); + return getThisBuilder(); + } + /** * Set optional float parameter for the Builder. * @@ -199,6 +213,7 @@ public B opt(@Nonnull final String key, int value) { @Override public B opt(@Nonnull final String key, float value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setFloat(key, value); return getThisBuilder(); } @@ -211,6 +226,7 @@ public B opt(@Nonnull final String key, float value) { @Override public B opt(@Nonnull final String key, double value) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setDouble(key, value); return getThisBuilder(); } @@ -223,6 +239,7 @@ public B opt(@Nonnull final String key, double value) { @Override public B opt(@Nonnull final String key, @Nonnull final String... values) { mandatoryKeys.remove(key); + optionalKeys.add(key); options.setStrings(key, values); return getThisBuilder(); } @@ -248,6 +265,7 @@ public B must(@Nonnull final String key, @Nonnull final String value) { @Override public B must(@Nonnull final String key, boolean value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setBoolean(key, value); return getThisBuilder(); } @@ -260,10 +278,19 @@ public B must(@Nonnull final String key, boolean value) { @Override public B must(@Nonnull final String key, int value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setInt(key, value); return getThisBuilder(); } + @Override + public B must(@Nonnull final String key, final long value) { + mandatoryKeys.add(key); + optionalKeys.remove(key); + options.setLong(key, value); + return getThisBuilder(); + } + /** * Set mandatory float option. * @@ -272,6 +299,7 @@ public B must(@Nonnull final String key, int value) { @Override public B must(@Nonnull final String key, float value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setFloat(key, value); return getThisBuilder(); } @@ -284,6 +312,7 @@ public B must(@Nonnull final String key, float value) { @Override public B must(@Nonnull final String key, double value) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setDouble(key, value); return getThisBuilder(); } @@ -296,6 +325,7 @@ public B must(@Nonnull final String key, double value) { @Override public B must(@Nonnull final String key, @Nonnull final String... values) { mandatoryKeys.add(key); + optionalKeys.remove(key); options.setStrings(key, values); return getThisBuilder(); } @@ -314,6 +344,12 @@ public Configuration getOptions() { public Set getMandatoryKeys() { return Collections.unmodifiableSet(mandatoryKeys); } + /** + * Get all the keys that are set as optional keys. + */ + public Set getOptionalKeys() { + return Collections.unmodifiableSet(optionalKeys); + } /** * Reject a configuration if one or more mandatory keys are diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java index 7c5a5d949a072..1fafd41b054b9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java @@ -51,6 +51,7 @@ import org.apache.hadoop.fs.PathHandle; import org.apache.hadoop.fs.UploadHandle; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.functional.FutureIO; import static org.apache.hadoop.fs.Path.mergePaths; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; @@ -98,7 +99,7 @@ public FileSystemMultipartUploader( public CompletableFuture startUpload(Path filePath) throws IOException { checkPath(filePath); - return FutureIOSupport.eval(() -> { + return FutureIO.eval(() -> { Path collectorPath = createCollectorPath(filePath); fs.mkdirs(collectorPath, FsPermission.getDirDefault()); @@ -116,7 +117,7 @@ public CompletableFuture putPart(UploadHandle uploadId, throws IOException { checkPutArguments(filePath, inputStream, partNumber, uploadId, lengthInBytes); - return FutureIOSupport.eval(() -> innerPutPart(filePath, + return FutureIO.eval(() -> innerPutPart(filePath, inputStream, partNumber, uploadId, lengthInBytes)); } @@ -179,7 +180,7 @@ public CompletableFuture complete( Map handleMap) throws IOException { checkPath(filePath); - return FutureIOSupport.eval(() -> + return FutureIO.eval(() -> innerComplete(uploadId, filePath, handleMap)); } @@ -251,7 +252,7 @@ public CompletableFuture abort(UploadHandle uploadId, Path collectorPath = new Path(new String(uploadIdByteArray, 0, uploadIdByteArray.length, Charsets.UTF_8)); - return FutureIOSupport.eval(() -> { + return FutureIO.eval(() -> { // force a check for a file existing; raises FNFE if not found fs.getFileStatus(collectorPath); fs.delete(collectorPath, true); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 24a8d49747fe6..70e39de7388c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.impl; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -47,7 +48,7 @@ * options accordingly, for example: * * If the option is not related to the file system, the option will be ignored. - * If the option is must, but not supported by the file system, a + * If the option is must, but not supported/known by the file system, an * {@link IllegalArgumentException} will be thrown. * */ @@ -147,8 +148,9 @@ public FutureDataInputStreamBuilder getThisBuilder() { } @Override - public FutureDataInputStreamBuilder withFileStatus(FileStatus st) { - this.status = requireNonNull(st, "status"); + public FutureDataInputStreamBuilder withFileStatus( + @Nullable FileStatus st) { + this.status = st; return this; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java index fe112d59352f5..f47e5f4fbfbd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -37,14 +36,16 @@ /** * Support for future IO and the FS Builder subclasses. - * If methods in here are needed for applications, promote - * to {@link FutureIO} for public use -with the original - * method relaying to it. This is to ensure that external - * filesystem implementations can safely use these methods + * All methods in this class have been superceded by those in + * {@link FutureIO}. + * The methods here are retained but all marked as deprecated. + * This is to ensure that any external + * filesystem implementations can still use these methods * without linkage problems surfacing. */ @InterfaceAudience.Private @InterfaceStability.Unstable +@Deprecated public final class FutureIOSupport { private FutureIOSupport() { @@ -53,6 +54,7 @@ private FutureIOSupport() { /** * Given a future, evaluate it. Raised exceptions are * extracted and handled. + * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}. * @param future future to evaluate * @param type of the result. * @return the result, if all went well. @@ -60,7 +62,8 @@ private FutureIOSupport() { * @throws IOException if something went wrong * @throws RuntimeException any nested RTE thrown */ - public static T awaitFuture(final Future future) + @Deprecated + public static T awaitFuture(final Future future) throws InterruptedIOException, IOException, RuntimeException { return FutureIO.awaitFuture(future); } @@ -69,6 +72,7 @@ public static T awaitFuture(final Future future) /** * Given a future, evaluate it. Raised exceptions are * extracted and handled. + * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}. * @param future future to evaluate * @param type of the result. * @return the result, if all went well. @@ -77,6 +81,7 @@ public static T awaitFuture(final Future future) * @throws RuntimeException any nested RTE thrown * @throws TimeoutException the future timed out. */ + @Deprecated public static T awaitFuture(final Future future, final long timeout, final TimeUnit unit) @@ -88,10 +93,7 @@ public static T awaitFuture(final Future future, /** * From the inner cause of an execution exception, extract the inner cause * if it is an IOE or RTE. - * This will always raise an exception, either the inner IOException, - * an inner RuntimeException, or a new IOException wrapping the raised - * exception. - * + * See {@link FutureIO#raiseInnerCause(ExecutionException)}. * @param e exception. * @param type of return value. * @return nothing, ever. @@ -99,6 +101,7 @@ public static T awaitFuture(final Future future, * any non-Runtime-Exception * @throws RuntimeException if that is the inner cause. */ + @Deprecated public static T raiseInnerCause(final ExecutionException e) throws IOException { return FutureIO.raiseInnerCause(e); @@ -107,6 +110,7 @@ public static T raiseInnerCause(final ExecutionException e) /** * Extract the cause of a completion failure and rethrow it if an IOE * or RTE. + * See {@link FutureIO#raiseInnerCause(CompletionException)}. * @param e exception. * @param type of return value. * @return nothing, ever. @@ -114,20 +118,15 @@ public static T raiseInnerCause(final ExecutionException e) * any non-Runtime-Exception * @throws RuntimeException if that is the inner cause. */ + @Deprecated public static T raiseInnerCause(final CompletionException e) throws IOException { return FutureIO.raiseInnerCause(e); } /** - * Propagate options to any builder, converting everything with the - * prefix to an option where, if there were 2+ dot-separated elements, - * it is converted to a schema. - *
-   *   fs.example.s3a.option => s3a:option
-   *   fs.example.fs.io.policy => s3a.io.policy
-   *   fs.example.something => something
-   * 
+ * Propagate options to any builder. + * {@link FutureIO#propagateOptions(FSBuilder, Configuration, String, String)} * @param builder builder to modify * @param conf configuration to read * @param optionalPrefix prefix for optional settings @@ -136,56 +135,39 @@ public static T raiseInnerCause(final CompletionException e) * @param type of builder * @return the builder passed in. */ + @Deprecated public static > FSBuilder propagateOptions( final FSBuilder builder, final Configuration conf, final String optionalPrefix, final String mandatoryPrefix) { - propagateOptions(builder, conf, - optionalPrefix, false); - propagateOptions(builder, conf, - mandatoryPrefix, true); - return builder; + return FutureIO.propagateOptions(builder, + conf, optionalPrefix, mandatoryPrefix); } /** - * Propagate options to any builder, converting everything with the - * prefix to an option where, if there were 2+ dot-separated elements, - * it is converted to a schema. - *
-   *   fs.example.s3a.option => s3a:option
-   *   fs.example.fs.io.policy => s3a.io.policy
-   *   fs.example.something => something
-   * 
+ * Propagate options to any builder. + * {@link FutureIO#propagateOptions(FSBuilder, Configuration, String, boolean)} * @param builder builder to modify * @param conf configuration to read * @param prefix prefix to scan/strip * @param mandatory are the options to be mandatory or optional? */ + @Deprecated public static void propagateOptions( final FSBuilder builder, final Configuration conf, final String prefix, final boolean mandatory) { - - final String p = prefix.endsWith(".") ? prefix : (prefix + "."); - final Map propsWithPrefix = conf.getPropsWithPrefix(p); - for (Map.Entry entry : propsWithPrefix.entrySet()) { - // change the schema off each entry - String key = entry.getKey(); - String val = entry.getValue(); - if (mandatory) { - builder.must(key, val); - } else { - builder.opt(key, val); - } - } + FutureIO.propagateOptions(builder, conf, prefix, mandatory); } /** * Evaluate a CallableRaisingIOE in the current thread, * converting IOEs to RTEs and propagating. + * See {@link FutureIO#eval(CallableRaisingIOE)}. + * * @param callable callable to invoke * @param Return type. * @return the evaluated result. @@ -194,17 +176,6 @@ public static void propagateOptions( */ public static CompletableFuture eval( CallableRaisingIOE callable) { - CompletableFuture result = new CompletableFuture<>(); - try { - result.complete(callable.apply()); - } catch (UnsupportedOperationException | IllegalArgumentException tx) { - // fail fast here - throw tx; - } catch (Throwable tx) { - // fail lazily here to ensure callers expect all File IO operations to - // surface later - result.completeExceptionally(tx); - } - return result; + return FutureIO.eval(callable); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java index 77b4ff52696a3..a19c5faff4d90 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -38,6 +38,9 @@ public class OpenFileParameters { */ private Set mandatoryKeys; + /** The optional keys. */ + private Set optionalKeys; + /** * Options set during the build sequence. */ @@ -61,6 +64,11 @@ public OpenFileParameters withMandatoryKeys(final Set keys) { return this; } + public OpenFileParameters withOptionalKeys(final Set keys) { + this.optionalKeys = requireNonNull(keys); + return this; + } + public OpenFileParameters withOptions(final Configuration opts) { this.options = requireNonNull(opts); return this; @@ -80,6 +88,10 @@ public Set getMandatoryKeys() { return mandatoryKeys; } + public Set getOptionalKeys() { + return optionalKeys; + } + public Configuration getOptions() { return options; } @@ -91,4 +103,5 @@ public int getBufferSize() { public FileStatus getStatus() { return status; } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java index 2fcdee915ede9..3f828897b1d6c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.concurrent.ExecutionException; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -28,13 +27,11 @@ import org.apache.hadoop.classification.InterfaceStability; /** - * A wrapper for an IOException which - * {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to - * always extract the exception. + * A wrapper for an IOException. * * The constructor signature guarantees the cause will be an IOException, * and as it checks for a null-argument, non-null. - * @deprecated use the {@code UncheckedIOException}. + * @deprecated use the {@code UncheckedIOException} directly.] */ @Deprecated @InterfaceAudience.Private @@ -52,8 +49,4 @@ public WrappedIOException(final IOException cause) { super(Preconditions.checkNotNull(cause)); } - @Override - public synchronized IOException getCause() { - return (IOException) super.getCause(); - } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java index f6f4247489ff3..678225f81e0e3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java @@ -55,6 +55,9 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** * Provides: argument processing to ensure the destination is valid @@ -348,7 +351,11 @@ protected void copyFileToTarget(PathData src, PathData target) src.fs.setVerifyChecksum(verifyChecksum); InputStream in = null; try { - in = src.fs.open(src.path); + in = awaitFuture(src.fs.openFile(src.path) + .withFileStatus(src.stat) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build()); copyStreamToTarget(in, target); preserveAttributes(src, target, preserveRawXattrs); } finally { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index b03d7de8a1c72..0643a2e983dcc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -98,7 +98,8 @@ protected void processArguments(LinkedList items) try { for (PathData src : srcs) { if (src.stat.getLen() != 0) { - try (FSDataInputStream in = src.fs.open(src.path)) { + // Always do sequential reads. + try (FSDataInputStream in = src.openForSequentialIO()) { IOUtils.copyBytes(in, out, getConf(), false); writeDelimiter(out); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java index 670fa152f72ed..d3ca013a3f251 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java @@ -105,7 +105,8 @@ private void printToStdout(InputStream in) throws IOException { } protected InputStream getInputStream(PathData item) throws IOException { - return item.fs.open(item.path); + // Always do sequential reads; + return item.openForSequentialIO(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java index 2280225b5ae32..7242f261801d6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java @@ -28,6 +28,8 @@ import java.util.LinkedList; import java.util.List; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; + /** * Show the first 1KB of the file. */ @@ -68,11 +70,9 @@ protected void processPath(PathData item) throws IOException { } private void dumpToOffset(PathData item) throws IOException { - FSDataInputStream in = item.fs.open(item.path); - try { + try (FSDataInputStream in = item.openFile( + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { IOUtils.copyBytes(in, System.out, endingOffset, false); - } finally { - in.close(); } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java index 1ff8d8f0494a1..2071a16799a5c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -39,6 +40,10 @@ import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.fs.RemoteIterator; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator; /** @@ -601,4 +606,34 @@ public boolean equals(Object o) { public int hashCode() { return path.hashCode(); } + + + /** + * Open a file for sequential IO. + *

+ * This uses FileSystem.openFile() to request sequential IO; + * the file status is also passed in. + * Filesystems may use to optimize their IO. + * @return an input stream + * @throws IOException failure + */ + protected FSDataInputStream openForSequentialIO() + throws IOException { + return openFile(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); + } + + /** + * Open a file. + * @param policy fadvise policy. + * @return an input stream + * @throws IOException failure + */ + protected FSDataInputStream openFile(final String policy) throws IOException { + return awaitFuture(fs.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + policy) + .opt(FS_OPTION_OPENFILE_LENGTH, + stat.getLen()) // file length hint for object stores + .build()); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java index 22dd32bce8512..22b135f064ca5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java @@ -30,6 +30,8 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; + /** * Get a listing of all files in that match the file patterns. */ @@ -107,16 +109,15 @@ private long dumpFromOffset(PathData item, long offset) throws IOException { if (offset < 0) { offset = Math.max(fileSize + offset, 0); } - - FSDataInputStream in = item.fs.open(item.path); - try { + // Always do sequential reads. + try (FSDataInputStream in = item.openFile( + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { in.seek(offset); // use conf so the system configured io block size is used IOUtils.copyBytes(in, System.out, getConf(), false); offset = in.getPos(); - } finally { - in.close(); } return offset; } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index 166007f5c9a42..c458269c3510d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -118,6 +118,9 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_OPEN = "op_open"; + /** Call to openFile() {@value}. */ + public static final String OP_OPENFILE = "op_openfile"; + /** {@value}. */ public static final String OP_REMOVE_ACL = "op_remove_acl"; @@ -323,6 +326,12 @@ public final class StoreStatisticNames { public static final String ACTION_EXECUTOR_ACQUIRED = "action_executor_acquired"; + /** + * A file was opened: {@value}. + */ + public static final String ACTION_FILE_OPENED + = "action_file_opened"; + /** * An HTTP HEAD request was made: {@value}. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 7e9137294c1ef..ca755f0841914 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -76,7 +76,7 @@ public final class StreamStatisticNames { public static final String STREAM_READ_CLOSED = "stream_read_closed"; /** - * Total count of times an attempt to close an input stream was made + * Total count of times an attempt to close an input stream was made. * Value: {@value}. */ public static final String STREAM_READ_CLOSE_OPERATIONS @@ -118,6 +118,23 @@ public final class StreamStatisticNames { public static final String STREAM_READ_OPERATIONS_INCOMPLETE = "stream_read_operations_incomplete"; + /** + * count/duration of aborting a remote stream during stream IO + * IO. + * Value: {@value}. + */ + public static final String STREAM_READ_REMOTE_STREAM_ABORTED + = "stream_read_remote_stream_aborted"; + + /** + * count/duration of closing a remote stream, + * possibly including draining the stream to recycle + * the HTTP connection. + * Value: {@value}. + */ + public static final String STREAM_READ_REMOTE_STREAM_DRAINED + = "stream_read_remote_stream_drain"; + /** * Count of version mismatches encountered while reading an input stream. * Value: {@value}. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java index 37ca37a187d7e..e06ccb54eb46b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java @@ -521,23 +521,39 @@ public static CallableRaisingIOE trackDurationOfOperation( // create the tracker outside try-with-resources so // that failures can be set in the catcher. DurationTracker tracker = createTracker(factory, statistic); - try { - // exec the input function and return its value - return input.apply(); - } catch (IOException | RuntimeException e) { - // input function failed: note it - tracker.failed(); - // and rethrow - throw e; - } finally { - // update the tracker. - // this is called after the catch() call would have - // set the failed flag. - tracker.close(); - } + return invokeTrackingDuration(tracker, input); }; } + /** + * Given an IOException raising callable/lambda expression, + * execute it, updating the tracker on success/failure. + * @param tracker duration tracker. + * @param input input callable. + * @param return type. + * @return the result of the invocation + * @throws IOException on failure. + */ + public static B invokeTrackingDuration( + final DurationTracker tracker, + final CallableRaisingIOE input) + throws IOException { + try { + // exec the input function and return its value + return input.apply(); + } catch (IOException | RuntimeException e) { + // input function failed: note it + tracker.failed(); + // and rethrow + throw e; + } finally { + // update the tracker. + // this is called after the catch() call would have + // set the failed flag. + tracker.close(); + } + } + /** * Given an IOException raising Consumer, * return a new one which wraps the inner and tracks diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 0581fb3f577a0..0699259bae960 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -60,6 +60,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** * SequenceFiles are flat files consisting of binary key/value @@ -1959,7 +1964,14 @@ private void initialize(Path filename, FSDataInputStream in, */ protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize, long length) throws IOException { - return fs.open(file, bufferSize); + FutureDataInputStreamBuilder builder = fs.openFile(file) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) + .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize); + if (length >= 0) { + builder.opt(FS_OPTION_OPENFILE_LENGTH, length); + } + return awaitFuture(builder.build()); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java index 111ce8f6201b5..bb61306b2b84a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java @@ -50,6 +50,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** @@ -266,7 +268,9 @@ public T load(FileSystem fs, Path path, @Nullable FileStatus status) if (status != null && status.getLen() == 0) { throw new EOFException("No data in " + path); } - FutureDataInputStreamBuilder builder = fs.openFile(path); + FutureDataInputStreamBuilder builder = fs.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE); if (status != null) { builder.withFileStatus(status); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java index e2cdc0fd41472..32e299b4d45b1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java @@ -34,7 +34,7 @@ import org.apache.hadoop.util.DurationInfo; -import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause; +import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause; /** * A bridge from Callable to Supplier; catching exceptions diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java index 3f7218baa759f..c3fda19d8d73b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.UncheckedIOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -29,6 +31,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSBuilder; /** * Future IO Helper methods. @@ -86,6 +90,8 @@ public static T awaitFuture(final Future future) * extracted and rethrown. *

* @param future future to evaluate + * @param timeout timeout to wait + * @param unit time unit. * @param type of the result. * @return the result, if all went well. * @throws InterruptedIOException future was interrupted @@ -185,4 +191,88 @@ public static IOException unwrapInnerException(final Throwable e) { } } + /** + * Propagate options to any builder, converting everything with the + * prefix to an option where, if there were 2+ dot-separated elements, + * it is converted to a schema. + * See {@link #propagateOptions(FSBuilder, Configuration, String, boolean)}. + * @param builder builder to modify + * @param conf configuration to read + * @param optionalPrefix prefix for optional settings + * @param mandatoryPrefix prefix for mandatory settings + * @param type of result + * @param type of builder + * @return the builder passed in. + */ + public static > + FSBuilder propagateOptions( + final FSBuilder builder, + final Configuration conf, + final String optionalPrefix, + final String mandatoryPrefix) { + propagateOptions(builder, conf, + optionalPrefix, false); + propagateOptions(builder, conf, + mandatoryPrefix, true); + return builder; + } + + /** + * Propagate options to any builder, converting everything with the + * prefix to an option where, if there were 2+ dot-separated elements, + * it is converted to a schema. + *
+   *   fs.example.s3a.option becomes "s3a.option"
+   *   fs.example.fs.io.policy becomes "fs.io.policy"
+   *   fs.example.something becomes "something"
+   * 
+ * @param builder builder to modify + * @param conf configuration to read + * @param prefix prefix to scan/strip + * @param mandatory are the options to be mandatory or optional? + */ + public static void propagateOptions( + final FSBuilder builder, + final Configuration conf, + final String prefix, + final boolean mandatory) { + + final String p = prefix.endsWith(".") ? prefix : (prefix + "."); + final Map propsWithPrefix = conf.getPropsWithPrefix(p); + for (Map.Entry entry : propsWithPrefix.entrySet()) { + // change the schema off each entry + String key = entry.getKey(); + String val = entry.getValue(); + if (mandatory) { + builder.must(key, val); + } else { + builder.opt(key, val); + } + } + } + + /** + * Evaluate a CallableRaisingIOE in the current thread, + * converting IOEs to RTEs and propagating. + * @param callable callable to invoke + * @param Return type. + * @return the evaluated result. + * @throws UnsupportedOperationException fail fast if unsupported + * @throws IllegalArgumentException invalid argument + */ + public static CompletableFuture eval( + CallableRaisingIOE callable) { + CompletableFuture result = new CompletableFuture<>(); + try { + result.complete(callable.apply()); + } catch (UnsupportedOperationException | IllegalArgumentException tx) { + // fail fast here + throw tx; + } catch (Throwable tx) { + // fail lazily here to ensure callers expect all File IO operations to + // surface later + result.completeExceptionally(tx); + } + return result; + } } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 4517bd8ff4a15..004220c4bedaa 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -814,97 +814,11 @@ exists in the metadata, but no copies of any its blocks can be located; ### `FSDataInputStreamBuilder openFile(Path path)` -Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html) -to construct a operation to open the file at `path` for reading. +See [openFile()](openfile.html). -When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, -the builder parameters are verified and -`openFileWithOptions(Path, OpenFileParameters)` invoked. - -This (protected) operation returns a `CompletableFuture` -which, when its `get()` method is called, either returns an input -stream of the contents of opened file, or raises an exception. - -The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` -ultimately invokes `open(Path, int)`. - -Thus the chain `openFile(path).build().get()` has the same preconditions -and postconditions as `open(Path p, int bufferSize)` - -However, there is one difference which implementations are free to -take advantage of: - -The returned stream MAY implement a lazy open where file non-existence or -access permission failures may not surface until the first `read()` of the -actual data. - -The `openFile()` operation may check the state of the filesystem during its -invocation, but as the state of the filesystem may change betwen this call and -the actual `build()` and `get()` operations, this file-specific -preconditions (file exists, file is readable, etc) MUST NOT be checked here. - -FileSystem implementations which do not implement `open(Path, int)` -MAY postpone raising an `UnsupportedOperationException` until either the -`FSDataInputStreamBuilder.build()` or the subsequent `get()` call, -else they MAY fail fast in the `openFile()` call. - -### Implementors notes - -The base implementation of `openFileWithOptions()` actually executes -the `open(path)` operation synchronously, yet still returns the result -or any failures in the `CompletableFuture<>`, so as to ensure that users -code expecting this. - -Any filesystem where the time to open a file may be significant SHOULD -execute it asynchronously by submitting the operation in some executor/thread -pool. This is particularly recommended for object stores and other filesystems -likely to be accessed over long-haul connections. - -Arbitrary filesystem-specific options MAY be supported; these MUST -be prefixed with either the filesystem schema, e.g. `hdfs.` -or in the "fs.SCHEMA" format as normal configuration settings `fs.hdfs`). The -latter style allows the same configuration option to be used for both -filesystem configuration and file-specific configuration. - -It SHOULD be possible to always open a file without specifying any options, -so as to present a consistent model to users. However, an implementation MAY -opt to require one or more mandatory options to be set. - -The returned stream may perform "lazy" evaluation of file access. This is -relevant for object stores where the probes for existence are expensive, and, -even with an asynchronous open, may be considered needless. - ### `FSDataInputStreamBuilder openFile(PathHandle)` -Creates a `FSDataInputStreamBuilder` to build an operation to open a file. -Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html) -to construct a operation to open the file identified by the given `PathHandle` for reading. - -When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, -the builder parameters are verified and -`openFileWithOptions(PathHandle, OpenFileParameters)` invoked. - -This (protected) operation returns a `CompletableFuture` -which, when its `get()` method is called, either returns an input -stream of the contents of opened file, or raises an exception. - -The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` method -returns a future which invokes `open(Path, int)`. - -Thus the chain `openFile(pathhandle).build().get()` has the same preconditions -and postconditions as `open(Pathhandle, int)` - -As with `FSDataInputStreamBuilder openFile(PathHandle)`, the `openFile()` -call must not be where path-specific preconditions are checked -that -is postponed to the `build()` and `get()` calls. - -FileSystem implementations which do not implement `open(PathHandle handle, int bufferSize)` -MAY postpone raising an `UnsupportedOperationException` until either the -`FSDataInputStreamBuilder.build()` or the subsequent `get()` call, -else they MAY fail fast in the `openFile()` call. - -The base implementation raises this exception in the `build()` operation; -other implementations SHOULD copy this. +See [openFile()](openfile.html). ### `PathHandle getPathHandle(FileStatus stat, HandleOpt... options)` diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index eadba174fc1a6..db630e05c22d4 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -13,10 +13,10 @@ --> - + -# class `org.apache.hadoop.fs.FSDataInputStreamBuilder` +# class `org.apache.hadoop.fs.FutureDataInputStreamBuilder` @@ -27,7 +27,7 @@ file for reading. ## Invariants -The `FSDataInputStreamBuilder` interface does not require parameters or +The `FutureDataInputStreamBuilder` interface does not require parameters or or the state of `FileSystem` until [`build()`](#build) is invoked and/or during the asynchronous open operation itself. @@ -39,11 +39,11 @@ path validation. ## Implementation-agnostic parameters. -### `FSDataInputStreamBuilder bufferSize(int bufSize)` +### `FutureDataInputStreamBuilder bufferSize(int bufSize)` Set the size of the buffer to be used. -### `FSDataInputStreamBuilder withFileStatus(FileStatus status)` +### `FutureDataInputStreamBuilder withFileStatus(FileStatus status)` A `FileStatus` instance which refers to the file being opened. @@ -53,7 +53,7 @@ So potentially saving on remote calls especially to object stores. Requirements: * `status != null` -* `status.getPath()` == the resolved path of the file being opened. +* `status.getPath().getName()` == the name of the file being opened. The path validation MUST take place if the store uses the `FileStatus` when it opens files, and MAY be performed otherwise. The validation @@ -65,27 +65,85 @@ If a filesystem implementation extends the `FileStatus` returned in its implementation MAY use this information when opening the file. This is relevant with those stores which return version/etag information, -including the S3A and ABFS connectors -they MAY use this to guarantee that -the file they opened is exactly the one returned in the listing. +-they MAY use this to guarantee that the file they opened +is exactly the one returned in the listing. + + +The final `status.getPath().getName()` element of the supplied status MUST equal +the name value of the path supplied to the `openFile(path)` call. + +Filesystems MUST NOT validate the rest of the path. +This is needed to support viewfs and other mount-point wrapper filesystems +where schemas and paths are different. These often create their own FileStatus results + +Preconditions + +```python +status == null or status.getPath().getName() == path.getName() + +``` + +Filesystems MUST NOT require the class of `status` to equal +that of any specific subclass their implementation returns in filestatus/list +operations. This is to support wrapper filesystems and serialization/deserialization +of the status. + ### Set optional or mandatory parameters - FSDataInputStreamBuilder opt(String key, ...) - FSDataInputStreamBuilder must(String key, ...) + FutureDataInputStreamBuilder opt(String key, ...) + FutureDataInputStreamBuilder must(String key, ...) Set optional or mandatory parameters to the builder. Using `opt()` or `must()`, client can specify FS-specific parameters without inspecting the concrete type of `FileSystem`. +Example: + ```java out = fs.openFile(path) - .opt("fs.s3a.experimental.input.fadvise", "random") - .must("fs.s3a.readahead.range", 256 * 1024) + .must("fs.option.openfile.read.policy", "random") + .opt("fs.http.connection.timeout", 30_000L) .withFileStatus(statusFromListing) .build() .get(); ``` +Here the read policy of `random` has been specified, +with the requirement that the filesystem implementation must understand the option. +An http-specific option has been supplied which may be interpreted by any store; +If the filesystem opening the file does not recognize the option, it can safely be +ignored. + +### When to use `opt()` versus `must()` + +The difference between `opt()` versus `must()` is how the FileSystem opening +the file must react to an option which it does not recognize. + +```python + +def must(name, value): + if not name in known_keys: + raise IllegalArgumentException + if not name in supported_keys: + raise UnsupportedException + + +def opt(name, value): + if not name in known_keys: + # ignore option + +``` + +For any known key, the validation of the `value` argument MUST be the same +irrespective of how the (key, value) pair was declared. + +1. For a filesystem-specific option, it is the choice of the implementation + how to validate the entry. +1. For standard options, the specification of what is a valid `value` is + defined in this filesystem specification, validated through contract + tests. + #### Implementation Notes Checking for supported options must be performed in the `build()` operation. @@ -93,9 +151,9 @@ Checking for supported options must be performed in the `build()` operation. 1. If a mandatory parameter declared via `must(key, value)`) is not recognized, `IllegalArgumentException` MUST be thrown. -1. If a mandatory parameter declared via `must(key, value)`) relies on +1. If a mandatory parameter declared via `must(key, value)` relies on a feature which is recognized but not supported in the specific -Filesystem/FileContext instance `UnsupportedException` MUST be thrown. +`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown. The behavior of resolving the conflicts between the parameters set by builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows: @@ -110,13 +168,18 @@ custom subclasses. This is critical to ensure safe use of the feature: directory listing/ status serialization/deserialization can result result in the `withFileStatus()` -argumennt not being the custom subclass returned by the Filesystem instance's +argument not being the custom subclass returned by the Filesystem instance's own `getFileStatus()`, `listFiles()`, `listLocatedStatus()` calls, etc. In such a situation the implementations must: -1. Validate the path (always). -1. Use the status/convert to the custom type, *or* simply discard it. +1. Verify that `status.getPath().getName()` matches the current `path.getName()` + value. The rest of the path MUST NOT be validated. +1. Use any status fields as desired -for example the file length. + +Even if not values of the status are used, the presence of the argument +can be interpreted as the caller declaring that they believe the file +to be present and of the given size. ## Builder interface @@ -128,26 +191,499 @@ completed, returns an input stream which can read data from the filesystem. The `build()` operation MAY perform the validation of the file's existence, its kind, so rejecting attempts to read from a directory or non-existent -file. **Alternatively**, the `build()` operation may delay all checks -until an asynchronous operation whose outcome is provided by the `Future` +file. Alternatively +* file existence/status checks MAY be performed asynchronously within the returned + `CompletableFuture<>`. +* file existence/status checks MAY be postponed until the first byte is read in + any of the read such as `read()` or `PositionedRead`. That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are -only guaranteed to have been met after the `get()` on the returned future is successful. +only guaranteed to have been met after the `get()` called on returned future +and an attempt has been made to read the stream. -Thus, if even a file does not exist, the following call will still succeed, returning -a future to be evaluated. +Thus, if even when file does not exist, or is a directory rather than a file, +the following call MUST succeed, returning a `CompletableFuture` to be evaluated. ```java Path p = new Path("file://tmp/file-which-does-not-exist"); CompletableFuture future = p.getFileSystem(conf) .openFile(p) - .build; + .build(); ``` -The preconditions for opening the file are checked during the asynchronous -evaluation, and so will surface when the future is completed: +The inability to access/read a file MUST raise an `IOException`or subclass +in either the future's `get()` call, or, for late binding operations, +when an operation to read data is invoked. + +Therefore the following sequence SHALL fail when invoked on the +`future` returned by the previous example. ```java -FSDataInputStream in = future.get(); + future.get().read(); ``` + +Access permission checks have the same visibility requirements: permission failures +MUST be delayed until the `get()` call and MAY be delayed into subsequent operations. + +Note: some operations on the input stream, such as `seek()` may not attempt any IO +at all. Such operations MAY NOT raise exceotions when interacting with +nonexistent/unreadable files. + +## Standard `openFile()` options since Hadoop 3.3.3 + +These are options which `FileSystem` and `FileContext` implementation +MUST recognise and MAY support by changing the behavior of +their input streams as appropriate. + +Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in +a later release. Therefore, although they are "well known", unless confident that +the application will only be executed against releases of Hadoop which knows of +the options -applications SHOULD set the options via `opt()` calls rather than `must()`. + +When opening a file through the `openFile()` builder API, callers MAY use +both `.opt(key, value)` and `.must(key, value)` calls to set standard and +filesystem-specific options. + +If set as an `opt()` parameter, unsupported "standard" options MUST be ignored, +as MUST unrecognized standard options. + +If set as a `must()` parameter, unsupported "standard" options MUST be ignored. +unrecognized standard options MUST be rejected. + +The standard `openFile()` options are defined +in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start +with `fs.option.openfile.`. + +Note that while all `FileSystem`/`FileContext` instances SHALL support these +options to the extent that `must()` declarations SHALL NOT fail, the +implementations MAY support them to the extent of interpreting the values. This +means that it is not a requirement for the stores to actually read the read +policy or file length values and use them when opening files. + +Unless otherwise stated, they SHOULD be viewed as hints. + +Note: if a standard option is added such that if set but not +supported would be an error, then implementations SHALL reject it. For example, +the S3A filesystem client supports the ability to push down SQL commands. If +something like that were ever standardized, then the use of the option, either +in `opt()` or `must()` argument MUST be rejected for filesystems which don't +support the feature. + +### Option: `fs.option.openfile.buffer.size` + +Read buffer size in bytes. + +This overrides the default value set in the configuration with the option +`io.file.buffer.size`. + +It is supported by all filesystem clients which allow for stream-specific buffer +sizes to be set via `FileSystem.open(path, buffersize)`. + +### Option: `fs.option.openfile.read.policy` + +Declare the read policy of the input stream. This is a hint as to what the +expected read pattern of an input stream will be. This MAY control readahead, +buffering and other optimizations. + +Sequential reads may be optimized with prefetching data and/or reading data in +larger blocks. Some applications (e.g. distCp) perform sequential IO even over +columnar data. + +In contrast, random IO reads data in different parts of the file using a +sequence of `seek()/read()` +or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs. + +Random IO performance may be best if little/no prefetching takes place, along +with other possible optimizations + +Queries over columnar formats such as Apache ORC and Apache Parquet perform such +random IO; other data formats may be best read with sequential or whole-file +policies. + +What is key is that optimizing reads for seqential reads may impair random +performance -and vice versa. + +1. The seek policy is a hint; even if declared as a `must()` option, the + filesystem MAY ignore it. +1. The interpretation/implementation of a policy is a filesystem specific + behavior -and it may change with Hadoop releases and/or specific storage + subsystems. +1. If a policy is not recognized, the filesystem client MUST ignore it. + +| Policy | Meaning | +|--------------|----------------------------------------------------------| +| `adaptive` | Any adaptive policy implemented by the store. | +| `default` | The default policy for this store. Generally "adaptive". | +| `random` | Optimize for random access. | +| `sequential` | Optimize for sequential access. | +| `vector` | The Vectored IO API is intended to be used. | +| `whole-file` | The whole file will be read. | + +Choosing the wrong read policy for an input source may be inefficient. + +A list of read policies MAY be supplied; the first one recognized/supported by +the filesystem SHALL be the one used. This allows for custom policies to be +supported, for example an `hbase-hfile` policy optimized for HBase HFiles. + +The S3A and ABFS input streams both implement +the [IOStatisticsSource](iostatistics.html) API, and can be queried for their IO +Performance. + +*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS +Input Streams log read statistics, which can provide insight about whether reads +are being performed efficiently or not. + +_Futher reading_ + +* [Linux fadvise()](https://linux.die.net/man/2/fadvise). +* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior) + +#### Read Policy `adaptive` + +Try to adapt the seek policy to the read pattern of the application. + +The `normal` policy of the S3A client and the sole policy supported by +the `wasb:` client are both adaptive -they assume sequential IO, but once a +backwards seek/positioned read call is made the stream switches to random IO. + +Other filesystem implementations may wish to adopt similar strategies, and/or +extend the algorithms to detect forward seeks and/or switch from random to +sequential IO if that is considered more efficient. + +Adaptive read policies are the absence of the ability to +declare the seek policy in the `open()` API, so requiring it to be declared, if +configurable, in the cluster/application configuration. However, the switch from +sequential to random seek policies may be exensive. + +When applications explicitly set the `fs.option.openfile.read.policy` option, if +they know their read plan, they SHOULD declare which policy is most appropriate. + +#### Read Policy `` + +The default policy for the filesystem instance. +Implementation/installation-specific. + +#### Read Policy `sequential` + +Expect sequential reads from the first byte read to the end of the file/until +the stream is closed. + +#### Read Policy `random` + +Expect `seek()/read()` sequences, or use of `PositionedReadable` +or `ByteBufferPositionedReadable` APIs. + + +#### Read Policy `vector` + +This declares that the caller intends to use the Vectored read API of +[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867) +_Add a high-performance vectored read API_. + +This is a hint: it is not a requirement when using the API. +It does inform the implemenations that the stream should be +configured for optimal vectored IO performance, if such a +feature has been implemented. + +It is *not* exclusive: the same stream may still be used for +classic `InputStream` and `PositionedRead` API calls. +Implementations SHOULD use the `random` read policy +with these operations. + +#### Read Policy `whole-file` + + +This declares that the whole file is to be read end-to-end; the file system client is free to enable +whatever strategies maximise performance for this. In particular, larger ranged reads/GETs can +deliver high bandwidth by reducing socket/TLS setup costs and providing a connection long-lived +enough for TCP flow control to determine the optimal download rate. + +Strategies can include: + +* Initiate an HTTP GET of the entire file in `openFile()` operation. +* Prefech data in large blocks, possibly in parallel read operations. + +Applications which know that the entire file is to be read from an opened stream SHOULD declare this +read policy. + +### Option: `fs.option.openfile.length` + +Declare the length of a file. + +This can be used by clients to skip querying a remote store for the size +of/existence of a file when opening it, similar to declaring a file status +through the `withFileStatus()` option. + +If supported by a filesystem connector, this option MUST be interpreted as +declaring the minimum length of the file: + +1. If the value is negative, the option SHALL be considered unset. +2. It SHALL NOT be an error if the actual length of the file is greater than + this value. +3. `read()`, `seek()` and positioned read calls MAY use a position across/beyond + this length but below the actual length of the file. Implementations MAY + raise `EOFExceptions` in such cases, or they MAY return data. + +If this option is used by the FileSystem implementation + +*Implementor's Notes* + +* A value of `fs.option.openfile.length` < 0 MUST be rejected. +* If a file status is supplied along with a value in `fs.opt.openfile.length`; + the file status values take precedence. + +### Options: `fs.option.openfile.split.start` and `fs.option.openfile.split.end` + +Declare the start and end of the split when a file has been split for processing +in pieces. + +1. If a value is negative, the option SHALL be considered unset. +1. Filesystems MAY assume that the length of the file is greater than or equal + to the value of `fs.option.openfile.split.end`. +1. And that they MAY raise an exception if the client application reads past the + value set in `fs.option.openfile.split.end`. +1. The pair of options MAY be used to optimise the read plan, such as setting + the content range for GET requests, or using the split end as an implicit + declaration of the guaranteed minimum length of the file. +1. If both options are set, and the split start is declared as greater than the + split end, then the split start SHOULD just be reset to zero, rather than + rejecting the operation. + +The split end value can provide a hint as to the end of the input stream. The +split start can be used to optimize any initial read offset for filesystem +clients. + +*Note for implementors: applications will read past the end of a split when they +need to read to the end of a record/line which begins before the end of the +split. + +Therefore clients MUST be allowed to `seek()`/`read()` past the length +set in `fs.option.openfile.split.end` if the file is actually longer +than that value. + +## S3A-specific options + +The S3A Connector supports custom options for readahead and seek policy. + +| Name | Type | Meaning | +|--------------------------------------|----------|-------------------------------------------------------------| +| `fs.s3a.readahead.range` | `long` | readahead range in bytes | +| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream | +| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` | + +If the option set contains a SQL statement in the `fs.s3a.select.sql` statement, +then the file is opened as an S3 Select query. +Consult the S3A documentation for more details. + +## ABFS-specific options + +The ABFS Connector supports custom input stream options. + +| Name | Type | Meaning | +|-----------------------------------|-----------|----------------------------------------------------| +| `fs.azure.buffered.pread.disable` | `boolean` | disable caching on the positioned read operations. | + + +Disables caching on data read through the [PositionedReadable](fsdatainputstream.html#PositionedReadable) +APIs. + +Consult the ABFS Documentation for more details. + +## Examples + +#### Declaring seek policy and split limits when opening a file. + +Here is an example from a proof of +concept `org.apache.parquet.hadoop.util.HadoopInputFile` +reader which uses a (nullable) file status and a split start/end. + +The `FileStatus` value is always passed in -but if it is null, then the split +end is used to declare the length of the file. + +```java +protected SeekableInputStream newStream(Path path, FileStatus stat, + long splitStart, long splitEnd) + throws IOException { + + FutureDataInputStreamBuilder builder = fs.openFile(path) + .opt("fs.option.openfile.read.policy", "vector, random") + .withFileStatus(stat); + + builder.opt("fs.option.openfile.split.start", splitStart); + builder.opt("fs.option.openfile.split.end", splitEnd); + CompletableFuture streamF = builder.build(); + return HadoopStreams.wrap(FutureIO.awaitFuture(streamF)); +} +``` + +As a result, whether driven directly by a file listing, or when opening a file +from a query plan of `(path, splitStart, splitEnd)`, there is no need to probe +the remote store for the length of the file. When working with remote object +stores, this can save tens to hundreds of milliseconds, even if such a probe is +done asynchronously. + +If both the file length and the split end is set, then the file length MUST be +considered "more" authoritative, that is it really SHOULD be defining the file +length. If the split end is set, the caller MAY ot read past it. + +The `CompressedSplitLineReader` can read past the end of a split if it is +partway through processing a compressed record. That is: it assumes an +incomplete record read means that the file length is greater than the split +length, and that it MUST read the entirety of the partially read record. Other +readers may behave similarly. + +Therefore + +1. File length as supplied in a `FileStatus` or in `fs.option.openfile.length` + SHALL set the strict upper limit on the length of a file +2. The split end as set in `fs.option.openfile.split.end` MUST be viewed as a + hint, rather than the strict end of the file. + +### Opening a file with both standard and non-standard options + +Standard and non-standard options MAY be combined in the same `openFile()` +operation. + +```java +Future f = openFile(path) + .must("fs.option.openfile.read.policy", "random, adaptive") + .opt("fs.s3a.readahead.range", 1024 * 1024) + .build(); + +FSDataInputStream is = f.get(); +``` + +The option set in `must()` MUST be understood, or at least recognized and +ignored by all filesystems. In this example, S3A-specific option MAY be +ignored by all other filesystem clients. + +### Opening a file with older releases + +Not all hadoop releases recognize the `fs.option.openfile.read.policy` option. + +The option can be safely used in application code if it is added via the `opt()` +builder argument, as it will be treated as an unknown optional key which can +then be discarded. + +```java +Future f = openFile(path) + .opt("fs.option.openfile.read.policy", "vector, random, adaptive") + .build(); + +FSDataInputStream is = f.get(); +``` + +*Note 1* if the option name is set by a reference to a constant in +`org.apache.hadoop.fs.Options.OpenFileOptions`, then the program will not link +against versions of Hadoop without the specific option. Therefore for resilient +linking against older releases -use a copy of the value. + +*Note 2* as option validation is performed in the FileSystem connector, +a third-party connector designed to work with multiple hadoop versions +MAY NOT support the option. + +### Passing options in to MapReduce + +Hadoop MapReduce will automatically read MR Job Options with the prefixes +`mapreduce.job.input.file.option.` and `mapreduce.job.input.file.must.` +prefixes, and apply these values as `.opt()` and `must()` respectively, after +remove the mapreduce-specific prefixes. + +This makes passing options in to MR jobs straightforward. For example, to +declare that a job should read its data using random IO: + +```java +JobConf jobConf = (JobConf) job.getConfiguration() +jobConf.set( + "mapreduce.job.input.file.option.fs.option.openfile.read.policy", + "random"); +``` + +### MapReduce input format propagating options + +An example of a record reader passing in options to the file it opens. + +```java + public void initialize(InputSplit genericSplit, + TaskAttemptContext context) throws IOException { + FileSplit split = (FileSplit)genericSplit; + Configuration job = context.getConfiguration(); + start = split.getStart(); + end = start + split.getLength(); + Path file = split.getPath(); + + // open the file and seek to the start of the split + FutureDataInputStreamBuilder builder = + file.getFileSystem(job).openFile(file); + // the start and end of the split may be used to build + // an input strategy. + builder.opt("fs.option.openfile.split.start", start); + builder.opt("fs.option.openfile.split.end", end); + FutureIO.propagateOptions(builder, job, + "mapreduce.job.input.file.option", + "mapreduce.job.input.file.must"); + + fileIn = FutureIO.awaitFuture(builder.build()); + fileIn.seek(start) + /* Rest of the operation on the opened stream */ + } +``` + +### `FileContext.openFile` + +From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input. +Because the file length has already been probed for, the length is passd down + +```java + public AvroFSInput(FileContext fc, Path p) throws IOException { + FileStatus status = fc.getFileStatus(p); + this.len = status.getLen(); + this.stream = awaitFuture(fc.openFile(p) + .opt("fs.option.openfile.read.policy", + "sequential") + .opt("fs.option.openfile.length", + Long.toString(status.getLen())) + .build()); + fc.open(p); + } +``` + +In this example, the length is passed down as a string (via `Long.toString()`) +rather than directly as a long. This is to ensure that the input format will +link against versions of $Hadoop which do not have the +`opt(String, long)` and `must(String, long)` builder parameters. Similarly, the +values are passed as optional, so that if unrecognized the application will +still succeed. + +### Example: reading a whole file + +This is from `org.apache.hadoop.util.JsonSerialization`. + +Its `load(FileSystem, Path, FileStatus)` method +* declares the whole file is to be read end to end. +* passes down the file status + +```java +public T load(FileSystem fs, + Path path, + status) + throws IOException { + + try (FSDataInputStream dataInputStream = + awaitFuture(fs.openFile(path) + .opt("fs.option.openfile.read.policy", "whole-file") + .withFileStatus(status) + .build())) { + return fromJsonStream(dataInputStream); + } catch (JsonProcessingException e) { + throw new PathIOException(path.toString(), + "Failed to read JSON file " + e, e); + } +} +``` + +*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call +required a non-null parameter; this has since been relaxed. +For maximum compatibility across versions, only invoke the method +when the file status is known to be non-null. \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md index a4aa136033a0c..e18f4c3bf4ab2 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md @@ -41,3 +41,4 @@ HDFS as these are commonly expected by Hadoop client applications. 2. [Extending the specification and its tests](extending.html) 1. [Uploading a file using Multiple Parts](multipartuploader.html) 1. [IOStatistics](iostatistics.html) +1. [openFile()](openfile.html). diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md new file mode 100644 index 0000000000000..afb3245c5105f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md @@ -0,0 +1,122 @@ + + +# `FileSystem.openFile()`/`FileContext.openFile()` + +This is a method provided by both FileSystem and FileContext for +advanced file opening options and, where implemented, +an asynchrounous/lazy opening of a file. + +Creates a builder to open a file, supporting options +both standard and filesystem specific. The return +value of the `build()` call is a `Future`, +which must be waited on. The file opening may be +asynchronous, and it may actually be postponed (including +permission/existence checks) until reads are actually +performed. + +This API call was added to `FileSystem` and `FileContext` in +Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows. + +* Added `opt(key, long)` and `must(key, long)`. +* Declared that `withFileStatus(null)` is allowed. +* Declared that `withFileStatus(status)` only checks + the filename of the path, not the full path. + This is needed to support passthrough/mounted filesystems. +* Added standard option keys. + +### `FutureDataInputStreamBuilder openFile(Path path)` + +Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) +to construct a operation to open the file at `path` for reading. + +When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance, +the builder parameters are verified and +`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or +`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked. + +These protected methods returns a `CompletableFuture` +which, when its `get()` method is called, either returns an input +stream of the contents of opened file, or raises an exception. + +The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)` +ultimately invokes `FileSystem.open(Path, int)`. + +Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions +and postconditions as `FileSystem.open(Path p, int bufferSize)` + +However, there is one difference which implementations are free to +take advantage of: + +The returned stream MAY implement a lazy open where file non-existence or +access permission failures may not surface until the first `read()` of the +actual data. + +This saves network IO on object stores. + +The `openFile()` operation MAY check the state of the filesystem during its +invocation, but as the state of the filesystem may change between this call and +the actual `build()` and `get()` operations, this file-specific +preconditions (file exists, file is readable, etc) MUST NOT be checked here. + +FileSystem implementations which do not implement `open(Path, int)` +MAY postpone raising an `UnsupportedOperationException` until either the +`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call, +else they MAY fail fast in the `openFile()` call. + +Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for details +on how to use the builder, and for standard options which may be passed in. + +### `FutureDataInputStreamBuilder openFile(PathHandle)` + +Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) +to construct a operation to open the file identified by the given `PathHandle` for reading. + +If implemented by a filesystem, the semantics of [`openFile(Path)`](#openfile_path_) +Thus the chain `openFile(pathhandle).build().get()` has the same preconditions and postconditions +as `open(Pathhandle, int)` + +FileSystem implementations which do not implement `open(PathHandle handle, int bufferSize)` +MAY postpone raising an `UnsupportedOperationException` until either the +`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call, else they MAY fail fast in +the `openFile(PathHandle)` call. + +The base implementation raises this exception in the `build()` operation; other implementations +SHOULD copy this. + +### Implementors notes + +The base implementation of `openFileWithOptions()` actually executes +the `open(path)` operation synchronously, yet still returns the result +or any failures in the `CompletableFuture<>`, so as to provide a consistent +lifecycle across all filesystems. + +Any filesystem client where the time to open a file may be significant SHOULD +execute it asynchronously by submitting the operation in some executor/thread +pool. This is particularly recommended for object stores and other filesystems +likely to be accessed over long-haul connections. + +Arbitrary filesystem-specific options MAY be supported; these MUST +be prefixed with either the filesystem schema, e.g. `hdfs.` +or in the `fs.SCHEMA` format as normal configuration settings `fs.hdfs`. The +latter style allows the same configuration option to be used for both +filesystem configuration and file-specific configuration. + +It SHOULD be possible to always open a file without specifying any options, +so as to present a consistent model to users. However, an implementation MAY +opt to require one or more mandatory options to be set. + +The returned stream may perform "lazy" evaluation of file access. This is +relevant for object stores where the probes for existence are expensive, and, +even with an asynchronous open, may be considered needless. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java index 3e754e4578de8..c395afdb3779b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -50,11 +50,11 @@ import org.apache.hadoop.util.DurationInfo; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; -import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.test.LambdaTestUtils.eventually; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** * Tests of multipart uploads. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index a43053180fbf8..25bfe082b01f6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -30,14 +30,18 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.IOUtils; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; import org.junit.Test; @@ -232,7 +236,7 @@ public void testAwaitFutureFailToFNFE() throws Throwable { getFileSystem().openFile(path("testAwaitFutureFailToFNFE")) .opt("fs.test.something", true); intercept(FileNotFoundException.class, - () -> FutureIOSupport.awaitFuture(builder.build())); + () -> awaitFuture(builder.build())); } @Test @@ -242,7 +246,7 @@ public void testAwaitFutureTimeoutFailToFNFE() throws Throwable { getFileSystem().openFile(path("testAwaitFutureFailToFNFE")) .opt("fs.test.something", true); intercept(FileNotFoundException.class, - () -> FutureIOSupport.awaitFuture(builder.build(), + () -> awaitFuture(builder.build(), 10, TimeUnit.DAYS)); } @@ -250,7 +254,7 @@ public void testAwaitFutureTimeoutFailToFNFE() throws Throwable { public void testOpenFileExceptionallyTranslating() throws Throwable { describe("openFile missing file chains into exceptionally()"); CompletableFuture f = getFileSystem() - .openFile(path("testOpenFileUnknownOption")).build(); + .openFile(path("testOpenFileExceptionallyTranslating")).build(); interceptFuture(RuntimeException.class, "exceptionally", f.exceptionally(ex -> { @@ -262,11 +266,12 @@ public void testOpenFileExceptionallyTranslating() throws Throwable { public void testChainedFailureAwaitFuture() throws Throwable { describe("await Future handles chained failures"); CompletableFuture f = getFileSystem() - .openFile(path("testOpenFileUnknownOption")) + .openFile(path("testChainedFailureAwaitFuture")) + .withFileStatus(null) .build(); intercept(RuntimeException.class, "exceptionally", - () -> FutureIOSupport.awaitFuture( + () -> awaitFuture( f.exceptionally(ex -> { throw new RuntimeException("exceptionally", ex); }))); @@ -280,13 +285,34 @@ public void testOpenFileApplyRead() throws Throwable { int len = 4096; createFile(fs, path, true, dataset(len, 0x40, 0x80)); + FileStatus st = fs.getFileStatus(path); CompletableFuture readAllBytes = fs.openFile(path) - .withFileStatus(fs.getFileStatus(path)) + .withFileStatus(st) .build() .thenApply(ContractTestUtils::readStream); assertEquals("Wrong number of bytes read value", len, (long) readAllBytes.get()); + // now reattempt with a new FileStatus and a different path + // other than the final name element + // implementations MUST use path in openFile() call + FileStatus st2 = new FileStatus( + len, false, + st.getReplication(), + st.getBlockSize(), + st.getModificationTime(), + st.getAccessTime(), + st.getPermission(), + st.getOwner(), + st.getGroup(), + new Path("gopher:///localhost:/" + path.getName())); + assertEquals("Wrong number of bytes read value", + len, + (long) fs.openFile(path) + .withFileStatus(st2) + .build() + .thenApply(ContractTestUtils::readStream) + .get()); } @Test @@ -298,17 +324,47 @@ public void testOpenFileApplyAsyncRead() throws Throwable { dataset(4, 0x40, 0x80)); CompletableFuture future = fs.openFile(path).build(); AtomicBoolean accepted = new AtomicBoolean(false); - future.thenAcceptAsync(i -> accepted.set(true)).get(); + future.thenApply(stream -> { + accepted.set(true); + return stream; + }).get().close(); assertTrue("async accept operation not invoked", accepted.get()); } + /** + * Open a file with a null status, and the length + * passed in as an opt() option (along with sequential IO). + * The file is opened, the data read, and it must match + * the source data. + * opt() is used so that integration testing with external + * filesystem connectors will downgrade if the option is not + * recognized. + */ @Test - public void testOpenFileNullStatus() throws Throwable { - describe("use openFile() with a null status"); + public void testOpenFileNullStatusButFileLength() throws Throwable { + describe("use openFile() with a null status and expect the status to be" + + " ignored. block size, fadvise and length are passed in as" + + " opt() options"); Path path = path("testOpenFileNullStatus"); - intercept(NullPointerException.class, - () -> getFileSystem().openFile(path).withFileStatus(null)); + FileSystem fs = getFileSystem(); + int len = 4; + byte[] result = new byte[len]; + byte[] dataset = dataset(len, 0x40, 0x80); + createFile(fs, path, true, + dataset); + CompletableFuture future = fs.openFile(path) + .withFileStatus(null) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + "unknown, sequential, random") + .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768) + .opt(FS_OPTION_OPENFILE_LENGTH, len) + .build(); + + try (FSDataInputStream in = future.get()) { + in.readFully(result); + } + compareByteArrays(dataset, result, len); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index e13a49ca10e70..eb56d957d9a1a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1642,17 +1642,22 @@ public static int read(InputStream in) { /** * Read a whole stream; downgrades an IOE to a runtime exception. + * Closes the stream afterwards. * @param in input * @return the number of bytes read. * @throws AssertionError on any IOException */ public static long readStream(InputStream in) { - long count = 0; + try { + long count = 0; - while (read(in) >= 0) { - count++; + while (read(in) >= 0) { + count++; + } + return count; + } finally { + IOUtils.cleanupWithLogger(LOG, in); } - return count; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java index 22f6c33d2e260..755599f0c390c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java @@ -36,6 +36,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN; import static org.assertj.core.api.Assertions.assertThat; /** @@ -347,6 +349,24 @@ public static AbstractLongAssert assertThatStatisticMaximum( verifyStatisticsNotNull(stats).maximums()); } + /** + * Assert that a duration is within a given minimum/maximum range. + * @param stats statistics source + * @param key statistic key without any suffix + * @param min minimum statistic must be equal to or greater than this. + * @param max maximum statistic must be equal to or less than this. + */ + public static void assertDurationRange( + final IOStatistics stats, + final String key, + final long min, + final long max) { + assertThatStatisticMinimum(stats, key + SUFFIX_MIN) + .isGreaterThanOrEqualTo(min); + assertThatStatisticMaximum(stats, key + SUFFIX_MAX) + .isLessThanOrEqualTo(max); + } + /** * Start an assertion chain on * a required mean statistic. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java index 8258b62c1f759..cfde1583e2c21 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.test.AbstractHadoopTestBase; import org.apache.hadoop.util.functional.FunctionRaisingIOE; @@ -276,7 +275,7 @@ public void testCallableIOEFailureDuration() throws Throwable { */ @Test public void testDurationThroughEval() throws Throwable { - CompletableFuture eval = FutureIOSupport.eval( + CompletableFuture eval = FutureIO.eval( trackDurationOfOperation(stats, REQUESTS, () -> { sleepf(100); throw new FileNotFoundException("oops"); From 513e99852e3ecf99945df0792242f98f9c252611 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sun, 24 Apr 2022 17:10:34 +0100 Subject: [PATCH 3/5] HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2) These changes ensure that sequential files are opened with the right read policy, and split start/end is passed in. As well as offering opportunities for filesystem clients to choose fetch/cache/seek policies, the settings ensure that processing text files on an s3 bucket where the default policy is "random" will still be processed efficiently. This commit depends on the associated hadoop-common patch, which must be committed first. Contributed by Steve Loughran. Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94 --- .../jobhistory/JobHistoryCopyService.java | 10 +++++++- .../hadoop/mapred/LineRecordReader.java | 13 +++++++--- .../lib/input/FixedLengthRecordReader.java | 7 +++--- .../mapreduce/lib/input/LineRecordReader.java | 14 ++++++++--- .../mapreduce/lib/input/NLineInputFormat.java | 6 ++--- .../examples/terasort/TeraInputFormat.java | 16 +++++++++++-- .../mapred/RetriableFileCopyCommand.java | 12 ++++++++-- .../mapreduce/StreamInputFormat.java | 6 ++--- .../logaggregation/AggregatedLogFormat.java | 17 +++++++++++-- .../apache/hadoop/yarn/util/FSDownload.java | 24 +++++++++++++------ 10 files changed, 96 insertions(+), 29 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java index ee4ec2c86a1c4..ecae4f2fc061a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java @@ -35,6 +35,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** * Reads in history events from the JobHistoryFile and sends them out again * to be recorded. @@ -118,7 +122,11 @@ public static FSDataInputStream getPreviousJobHistoryFileStream( fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, jobId, (applicationAttemptId.getAttemptId() - 1))); LOG.info("History file is at " + historyFile); - in = fc.open(historyFile); + in = awaitFuture( + fc.openFile(historyFile) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build()); return in; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 1fcb118a100fc..5724e72931085 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CodecPool; @@ -41,9 +40,13 @@ import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader; +import org.apache.hadoop.util.functional.FutureIO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; + /** * Treats keys as offset in file and value as line. */ @@ -109,10 +112,14 @@ public LineRecordReader(Configuration job, FileSplit split, // open the file and seek to the start of the split final FutureDataInputStreamBuilder builder = file.getFileSystem(job).openFile(file); - FutureIOSupport.propagateOptions(builder, job, + // the start and end of the split may be used to build + // an input strategy. + builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start) + .opt(FS_OPTION_OPENFILE_SPLIT_END, end); + FutureIO.propagateOptions(builder, job, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); - fileIn = FutureIOSupport.awaitFuture(builder.build()); + fileIn = FutureIO.awaitFuture(builder.build()); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java index c0ae9a5cdac61..6969f61836fbc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.compress.CodecPool; @@ -40,6 +39,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.functional.FutureIO; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,10 +95,10 @@ public void initialize(Configuration job, long splitStart, long splitLength, // open the file final FutureDataInputStreamBuilder builder = file.getFileSystem(job).openFile(file); - FutureIOSupport.propagateOptions(builder, job, + FutureIO.propagateOptions(builder, job, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); - fileIn = FutureIOSupport.awaitFuture(builder.build()); + fileIn = FutureIO.awaitFuture(builder.build()); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null != codec) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 160c7635658a4..617abaacae065 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CodecPool; @@ -40,9 +39,14 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.functional.FutureIO; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; + /** * Treats keys as offset in file and value as line. */ @@ -86,10 +90,14 @@ public void initialize(InputSplit genericSplit, // open the file and seek to the start of the split final FutureDataInputStreamBuilder builder = file.getFileSystem(job).openFile(file); - FutureIOSupport.propagateOptions(builder, job, + // the start and end of the split may be used to build + // an input strategy. + builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start); + builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end); + FutureIO.propagateOptions(builder, job, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); - fileIn = FutureIOSupport.awaitFuture(builder.build()); + fileIn = FutureIO.awaitFuture(builder.build()); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java index dfff9ad0d2b73..5161a96c3459d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; @@ -39,6 +38,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.functional.FutureIO; /** * NLineInputFormat which splits N lines of input as one split. @@ -99,10 +99,10 @@ public static List getSplitsForFile(FileStatus status, try { final FutureDataInputStreamBuilder builder = fileName.getFileSystem(conf).openFile(fileName); - FutureIOSupport.propagateOptions(builder, conf, + FutureIO.propagateOptions(builder, conf, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); - FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build()); + FSDataInputStream in = FutureIO.awaitFuture(builder.build()); lr = new LineReader(in, conf); Text line = new Text(); int numLines = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java index 20ce8ef2b60de..f284a9c380756 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; @@ -41,6 +42,12 @@ import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.functional.FutureIO; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; /** * An input format that reads the first 10 characters of each line as the key @@ -224,12 +231,17 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Path p = ((FileSplit)split).getPath(); FileSystem fs = p.getFileSystem(context.getConfiguration()); - in = fs.open(p); long start = ((FileSplit)split).getStart(); // find the offset to start at a record boundary offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH; - in.seek(start + offset); length = ((FileSplit)split).getLength(); + final FutureDataInputStreamBuilder builder = fs.openFile(p) + .opt(FS_OPTION_OPENFILE_SPLIT_START, start) + .opt(FS_OPTION_OPENFILE_SPLIT_END, start + length) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); + in = FutureIO.awaitFuture(builder.build()); + in.seek(start + offset); } public void close() throws IOException { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 6404e85661228..d6825f75d8c71 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -45,7 +45,11 @@ import org.apache.hadoop.tools.util.RetriableCommand; import org.apache.hadoop.tools.util.ThrottledInputStream; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.VisibleForTesting; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** * This class extends RetriableCommand to implement the copy of files, @@ -328,7 +332,11 @@ private static ThrottledInputStream getInputStream(Path path, FileSystem fs = path.getFileSystem(conf); float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, DistCpConstants.DEFAULT_BANDWIDTH_MB); - FSDataInputStream in = fs.open(path); + // open with sequential read, but not whole-file + FSDataInputStream in = awaitFuture(fs.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) + .build()); return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024); } catch (IOException e) { diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java index 77f4e041d5f09..f44488c7c0202 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.streaming.StreamUtil; +import org.apache.hadoop.util.functional.FutureIO; /** * An input format that selects a RecordReader based on a JobConf property. This @@ -66,10 +66,10 @@ public RecordReader createRecordReader(InputSplit genericSplit, FileSystem fs = path.getFileSystem(conf); // open the file final FutureDataInputStreamBuilder builder = fs.openFile(path); - FutureIOSupport.propagateOptions(builder, conf, + FutureIO.propagateOptions(builder, conf, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); - FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build()); + FSDataInputStream in = FutureIO.awaitFuture(builder.build()); // Factory dispatch based on available params.. Class readerClass; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 0fa9764b7bb1f..0122b873aabf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -77,6 +78,11 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + @Public @Evolving public class AggregatedLogFormat { @@ -576,9 +582,16 @@ public LogReader(Configuration conf, Path remoteAppLogFile) try { FileContext fileContext = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); - this.fsDataIStream = fileContext.open(remoteAppLogFile); + FileStatus status = fileContext.getFileStatus(remoteAppLogFile); + this.fsDataIStream = awaitFuture( + fileContext.openFile(remoteAppLogFile) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) + .opt(FS_OPTION_OPENFILE_LENGTH, + status.getLen()) // file length hint for object stores + .build()); reader = new TFile.Reader(this.fsDataIStream, - fileContext.getFileStatus(remoteAppLogFile).getLen(), conf); + status.getLen(), conf); this.scanner = reader.createScanner(); } catch (IOException ioe) { close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index e5fb417561179..640cc82f539d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -60,7 +60,11 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.yarn.exceptions.YarnException; -/** +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + + /** * Download a single URL to the local disk. * */ @@ -285,23 +289,25 @@ private void verifyAndCopy(Path destination) } } - downloadAndUnpack(sCopy, destination); + downloadAndUnpack(sCopy, sStat, destination); } /** * Copy source path to destination with localization rules. - * @param source source path to copy. Typically HDFS + * @param source source path to copy. Typically HDFS or an object store. + * @param sourceStatus status of source * @param destination destination path. Typically local filesystem * @exception YarnException Any error has occurred */ - private void downloadAndUnpack(Path source, Path destination) + private void downloadAndUnpack(Path source, + FileStatus sourceStatus, Path destination) throws YarnException { try { FileSystem sourceFileSystem = source.getFileSystem(conf); FileSystem destinationFileSystem = destination.getFileSystem(conf); - if (sourceFileSystem.getFileStatus(source).isDirectory()) { + if (sourceStatus.isDirectory()) { FileUtil.copy( - sourceFileSystem, source, + sourceFileSystem, sourceStatus, destinationFileSystem, destination, false, true, conf); } else { @@ -329,7 +335,11 @@ private void unpack(Path source, Path destination, FileSystem sourceFileSystem, FileSystem destinationFileSystem) throws IOException, InterruptedException, ExecutionException { - try (InputStream inputStream = sourceFileSystem.open(source)) { + try (InputStream inputStream = awaitFuture( + sourceFileSystem.openFile(source) + .opt(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build())) { File dst = new File(destination.toUri()); String lowerDst = StringUtils.toLowerCase(dst.getName()); switch (resource.getType()) { From 03753376d49f2f7caaf547d62664469913f69bfd Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sun, 24 Apr 2022 17:23:19 +0100 Subject: [PATCH 4/5] HADOOP-16202. Enhanced openFile(): hadoop-aws changes. (#2584/3) S3A input stream support for the few fs.option.openfile settings. As well as supporting the read policy option and values, if the file length is declared in fs.option.openfile.length then no HEAD request will be issued when opening a file. This can cut a few tens of milliseconds off the operation. The patch adds a new openfile parameter/FS configuration option fs.s3a.input.async.drain.threshold (default: 16000). It declares the number of bytes remaining in the http input stream above which any operation to read and discard the rest of the stream, "draining", is executed asynchronously. This asynchronous draining offers some performance benefit on seek-heavy file IO. Contributed by Steve Loughran. Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39 --- .../dev-support/findbugs-exclude.xml | 5 + .../org/apache/hadoop/fs/s3a/Constants.java | 45 +- .../org/apache/hadoop/fs/s3a/Invoker.java | 28 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 283 ++++----- .../apache/hadoop/fs/s3a/S3AInputPolicy.java | 93 ++- .../apache/hadoop/fs/s3a/S3AInputStream.java | 267 +++++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 12 +- .../hadoop/fs/s3a/S3AReadOpContext.java | 101 ++- .../hadoop/fs/s3a/S3ObjectAttributes.java | 15 +- .../org/apache/hadoop/fs/s3a/Statistic.java | 17 + .../fs/s3a/commit/CommitOperations.java | 8 +- .../fs/s3a/commit/files/PendingSet.java | 26 +- .../s3a/commit/files/SinglePendingCommit.java | 20 +- .../fs/s3a/impl/AbstractStoreOperation.java | 26 +- .../hadoop/fs/s3a/impl/CallableSupplier.java | 2 +- .../hadoop/fs/s3a/impl/InternalConstants.java | 21 +- .../hadoop/fs/s3a/impl/OpenFileSupport.java | 600 ++++++++++++++++++ .../hadoop/fs/s3a/s3guard/S3GuardTool.java | 4 +- .../s3a/select/InternalSelectConstants.java | 2 +- .../hadoop/fs/s3a/select/SelectTool.java | 4 +- .../statistics/S3AInputStreamStatistics.java | 7 + .../impl/EmptyS3AStatisticsContext.java | 4 + .../site/markdown/tools/hadoop-aws/index.md | 8 + .../fs/contract/s3a/ITestS3AContractOpen.java | 67 ++ .../fs/contract/s3a/ITestS3AContractSeek.java | 17 +- .../hadoop/fs/s3a/AbstractS3AMockTest.java | 4 + .../hadoop/fs/s3a/ITestS3AConfiguration.java | 11 - .../apache/hadoop/fs/s3a/S3ATestUtils.java | 4 +- .../fs/s3a/TestS3AInputStreamRetry.java | 17 +- .../apache/hadoop/fs/s3a/TestS3AUnbuffer.java | 5 +- .../fs/s3a/TestStreamChangeTracker.java | 2 +- .../fs/s3a/impl/TestOpenFileSupport.java | 429 +++++++++++++ .../fs/s3a/performance/ITestS3AOpenCost.java | 209 ++++++ .../fs/s3a/performance/OperationCost.java | 6 + .../scale/ITestS3AInputStreamPerformance.java | 58 +- .../fs/s3a/select/AbstractS3SelectTest.java | 2 +- .../hadoop/fs/s3a/select/ITestS3Select.java | 7 +- .../fs/s3a/select/ITestS3SelectMRJob.java | 4 +- 38 files changed, 2065 insertions(+), 375 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index d0496de538e44..e823840fd7129 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -28,6 +28,11 @@ + + + + +