diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index d5a6024b4e59c..1a135ded88098 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -1121,11 +1121,6 @@
-
- software.amazon.eventstream
- eventstream
- ${aws.eventstream.version}
- org.apache.minamina-core
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 5ff4223e2d60b..5a0f2356b5e4a 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -508,11 +508,6 @@
bundlecompile
-
- software.amazon.eventstream
- eventstream
- test
- org.assertjassertj-core
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c5e6e09a835eb..de48c2df15698 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -83,8 +83,6 @@
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Object;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
@@ -194,8 +192,6 @@
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
-import org.apache.hadoop.fs.s3a.select.SelectBinding;
-import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -299,7 +295,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private S3Client s3Client;
- /** Async client is used for transfer manager and s3 select. */
+ /** Async client is used for transfer manager. */
private S3AsyncClient s3AsyncClient;
// initial callback policy is fail-once; it's there just to assist
@@ -1725,8 +1721,7 @@ public FSDataInputStream open(Path f, int bufferSize)
/**
* Opens an FSDataInputStream at the indicated Path.
* The {@code fileInformation} parameter controls how the file
- * is opened, whether it is normal vs. an S3 select call,
- * can a HEAD be skipped, etc.
+ * is opened, can a HEAD be skipped, etc.
* @param path the file to open
* @param fileInformation information about the file to open
* @throws IOException IO failure.
@@ -1853,13 +1848,6 @@ public CompletableFuture submit(final CallableRaisingIOE operation) {
private final class WriteOperationHelperCallbacksImpl
implements WriteOperationHelper.WriteOperationHelperCallbacks {
- @Override
- public CompletableFuture selectObjectContent(
- SelectObjectContentRequest request,
- SelectObjectContentResponseHandler responseHandler) {
- return getS3AsyncClient().selectObjectContent(request, responseHandler);
- }
-
@Override
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
@@ -1872,7 +1860,7 @@ public CompleteMultipartUploadResponse completeMultipartUpload(
* using FS state as well as the status.
* @param fileStatus file status.
* @param auditSpan audit span.
- * @return a context for read and select operations.
+ * @return a context for read operations.
*/
@VisibleForTesting
protected S3AReadOpContext createReadContext(
@@ -5452,13 +5440,6 @@ public boolean hasPathCapability(final Path path, final String capability)
// capability depends on FS configuration
return isMagicCommitEnabled();
- case SelectConstants.S3_SELECT_CAPABILITY:
- // select is only supported if enabled and client side encryption is
- // disabled.
- return !isCSEEnabled
- && SelectBinding.isSelectEnabled(getConf())
- && !s3ExpressStore;
-
case CommonPathCapabilities.FS_CHECKSUMS:
// capability depends on FS configuration
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
@@ -5572,85 +5553,6 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
return credentials.share();
}
- /**
- * This is a proof of concept of a select API.
- * @param source path to source data
- * @param options request configuration from the builder.
- * @param fileInformation any passed in information.
- * @return the stream of the results
- * @throws IOException IO failure
- */
- @Retries.RetryTranslated
- @AuditEntryPoint
- private FSDataInputStream select(final Path source,
- final Configuration options,
- final OpenFileSupport.OpenFileInformation fileInformation)
- throws IOException {
- requireSelectSupport(source);
- final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
- final Path path = makeQualified(source);
- String expression = fileInformation.getSql();
- final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
- fileInformation);
-
- // readahead range can be dynamically set
- S3ObjectAttributes objectAttributes = createObjectAttributes(
- path, fileStatus);
- ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
- S3AReadOpContext readContext = createReadContext(
- fileStatus,
- auditSpan);
- fileInformation.applyOptions(readContext);
-
- if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
- && fileStatus.getEtag() != null) {
- // if there is change detection, and the status includes at least an
- // etag,
- // check that the object metadata lines up with what is expected
- // based on the object attributes (which may contain an eTag or
- // versionId).
- // This is because the select API doesn't offer this.
- // (note: this is trouble for version checking as cannot force the old
- // version in the final read; nor can we check the etag match)
- ChangeTracker changeTracker =
- new ChangeTracker(uri.toString(),
- changePolicy,
- readContext.getS3AStatisticsContext()
- .newInputStreamStatistics()
- .getChangeTrackerStatistics(),
- objectAttributes);
-
- // will retry internally if wrong version detected
- Invoker readInvoker = readContext.getReadInvoker();
- getObjectMetadata(path, changeTracker, readInvoker, "select");
- }
- // instantiate S3 Select support using the current span
- // as the active span for operations.
- SelectBinding selectBinding = new SelectBinding(
- createWriteOperationHelper(auditSpan));
-
- // build and execute the request
- return selectBinding.select(
- readContext,
- expression,
- options,
- objectAttributes);
- }
-
- /**
- * Verify the FS supports S3 Select.
- * @param source source file.
- * @throws UnsupportedOperationException if not.
- */
- private void requireSelectSupport(final Path source) throws
- UnsupportedOperationException {
- if (!isCSEEnabled && !SelectBinding.isSelectEnabled(getConf())) {
-
- throw new UnsupportedOperationException(
- SelectConstants.SELECT_UNSUPPORTED);
- }
- }
-
/**
* Get the file status of the source file.
* If in the fileInformation parameter return that
@@ -5681,16 +5583,14 @@ private S3AFileStatus extractOrFetchSimpleFileStatus(
}
/**
- * Initiate the open() or select() operation.
+ * Initiate the open() operation.
* This is invoked from both the FileSystem and FileContext APIs.
* It's declared as an audit entry point but the span creation is pushed
- * down into the open/select methods it ultimately calls.
+ * down into the open operation s it ultimately calls.
* @param rawPath path to the file
* @param parameters open file parameters from the builder.
- * @return a future which will evaluate to the opened/selected file.
+ * @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
- * @throws PathIOException operation is a select request but S3 select is
- * disabled
* @throws IllegalArgumentException unknown mandatory key
*/
@Override
@@ -5706,20 +5606,9 @@ public CompletableFuture openFileWithOptions(
parameters,
getDefaultBlockSize());
CompletableFuture result = new CompletableFuture<>();
- if (!fileInformation.isS3Select()) {
- // normal path.
- unboundedThreadPool.submit(() ->
- LambdaUtils.eval(result,
- () -> executeOpen(path, fileInformation)));
- } else {
- // it is a select statement.
- // fail fast if the operation is not available
- requireSelectSupport(path);
- // submit the query
- unboundedThreadPool.submit(() ->
- LambdaUtils.eval(result,
- () -> select(path, parameters.getOptions(), fileInformation)));
- }
+ unboundedThreadPool.submit(() ->
+ LambdaUtils.eval(result,
+ () -> executeOpen(path, fileInformation)));
return result;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
index 4fc5b8658b605..18912d5d3caef 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
@@ -25,7 +25,7 @@
/**
* This class holds attributes of an object independent of the
* file status type.
- * It is used in {@link S3AInputStream} and the select equivalent.
+ * It is used in {@link S3AInputStream} and elsewhere.
* as a way to reduce parameters being passed
* to the constructor of such class,
* and elsewhere to be a source-neutral representation of a file status.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 72fc75b642415..ce3af3de803a4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -265,10 +265,6 @@ public enum Statistic {
StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
"number of bytes queued for upload/being actively uploaded",
TYPE_GAUGE),
- OBJECT_SELECT_REQUESTS(
- StoreStatisticNames.OBJECT_SELECT_REQUESTS,
- "Count of S3 Select requests issued",
- TYPE_COUNTER),
STREAM_READ_ABORTED(
StreamStatisticNames.STREAM_READ_ABORTED,
"Count of times the TCP stream was aborted",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index f2ece63a854fa..3bbe000bf5b6e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -22,7 +22,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.core.sync.RequestBody;
@@ -33,8 +32,6 @@
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import org.slf4j.Logger;
@@ -49,16 +46,11 @@
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
-import org.apache.hadoop.fs.s3a.select.SelectEventStreamPublisher;
-import org.apache.hadoop.fs.s3a.select.SelectObjectContentHelper;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
-import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
-import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
-import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Invoker.*;
@@ -82,7 +74,6 @@
*
Other low-level access to S3 functions, for private use.
*
Failure handling, including converting exceptions to IOEs.
*
Integration with instrumentation.
- *
Evolution to add more low-level operations, such as S3 select.
*
*
* This API is for internal use only.
@@ -615,63 +606,6 @@ public Configuration getConf() {
return conf;
}
- public SelectObjectContentRequest.Builder newSelectRequestBuilder(Path path) {
- try (AuditSpan span = getAuditSpan()) {
- return getRequestFactory().newSelectRequestBuilder(
- storeContext.pathToKey(path));
- }
- }
-
- /**
- * Execute an S3 Select operation.
- * On a failure, the request is only logged at debug to avoid the
- * select exception being printed.
- *
- * @param source source for selection
- * @param request Select request to issue.
- * @param action the action for use in exception creation
- * @return response
- * @throws IOException failure
- */
- @Retries.RetryTranslated
- public SelectEventStreamPublisher select(
- final Path source,
- final SelectObjectContentRequest request,
- final String action)
- throws IOException {
- // no setting of span here as the select binding is (statically) created
- // without any span.
- String bucketName = request.bucket();
- Preconditions.checkArgument(bucket.equals(bucketName),
- "wrong bucket: %s", bucketName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initiating select call {} {}",
- source, request.expression());
- LOG.debug(SelectBinding.toString(request));
- }
- return invoker.retry(
- action,
- source.toString(),
- true,
- withinAuditSpan(getAuditSpan(), () -> {
- try (DurationInfo ignored =
- new DurationInfo(LOG, "S3 Select operation")) {
- try {
- return SelectObjectContentHelper.select(
- writeOperationHelperCallbacks, source, request, action);
- } catch (Throwable e) {
- LOG.error("Failure of S3 Select request against {}",
- source);
- LOG.debug("S3 Select request against {}:\n{}",
- source,
- SelectBinding.toString(request),
- e);
- throw e;
- }
- }
- }));
- }
-
@Override
public AuditSpan createSpan(final String operation,
@Nullable final String path1,
@@ -705,15 +639,6 @@ public RequestFactory getRequestFactory() {
*/
public interface WriteOperationHelperCallbacks {
- /**
- * Initiates a select request.
- * @param request selectObjectContent request
- * @param t selectObjectContent request handler
- * @return selectObjectContentResult
- */
- CompletableFuture selectObjectContent(SelectObjectContentRequest request,
- SelectObjectContentResponseHandler t);
-
/**
* Initiates a complete multi-part upload request.
* @param request Complete multi-part upload request
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
index 0fda4921a30da..5ad9c9f9b6482 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -31,16 +31,13 @@
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
-import org.apache.hadoop.fs.s3a.select.SelectEventStreamPublisher;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
@@ -274,32 +271,6 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
*/
Configuration getConf();
- /**
- * Create a S3 Select request builder for the destination path.
- * This does not build the query.
- * @param path pre-qualified path for query
- * @return the request builder
- */
- SelectObjectContentRequest.Builder newSelectRequestBuilder(Path path);
-
- /**
- * Execute an S3 Select operation.
- * On a failure, the request is only logged at debug to avoid the
- * select exception being printed.
- *
- * @param source source for selection
- * @param request Select request to issue.
- * @param action the action for use in exception creation
- * @return response
- * @throws IOException failure
- */
- @Retries.RetryTranslated
- SelectEventStreamPublisher select(
- Path source,
- SelectObjectContentRequest request,
- String action)
- throws IOException;
-
/**
* Increment the write operation counter
* of the filesystem.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
index 99a898f728166..73ad137a86d3c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
@@ -37,7 +37,6 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
@@ -214,14 +213,6 @@ UploadPartRequest.Builder newUploadPartRequestBuilder(
int partNumber,
long size) throws PathIOException;
- /**
- * Create a S3 Select request builder for the destination object.
- * This does not build the query.
- * @param key object key
- * @return the request builder
- */
- SelectObjectContentRequest.Builder newSelectRequestBuilder(String key);
-
/**
* Create the (legacy) V1 list request builder.
* @param key key to list under
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
index 3df862055d197..e91710a0af3a0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
@@ -35,7 +35,6 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
@@ -50,7 +49,6 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_DELETE_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
-import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_SELECT_REQUESTS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
/**
@@ -132,12 +130,6 @@ public RequestInfo analyze(SdkRequest request) {
return writing(OBJECT_PUT_REQUEST,
r.key(),
0);
- } else if (request instanceof SelectObjectContentRequest) {
- SelectObjectContentRequest r =
- (SelectObjectContentRequest) request;
- return reading(OBJECT_SELECT_REQUESTS,
- r.key(),
- 1);
} else if (request instanceof UploadPartRequest) {
UploadPartRequest r = (UploadPartRequest) request;
return writing(MULTIPART_UPLOAD_PART_PUT,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
index 2c9d6857b46a2..0c56ca1f308bb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
@@ -223,7 +223,7 @@ public void processResponse(final CopyObjectResponse copyObjectResponse)
* cause.
* @param e the exception
* @param operation the operation performed when the exception was
- * generated (e.g. "copy", "read", "select").
+ * generated (e.g. "copy", "read").
* @throws RemoteFileChangedException if the remote file has changed.
*/
public void processException(SdkException e, String operation) throws
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 8ebf8c013d10a..1d12a41008b6b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -113,8 +113,6 @@ private InternalConstants() {
/**
* The known keys used in a standard openFile call.
- * if there's a select marker in there then the keyset
- * used becomes that of the select operation.
*/
@InterfaceStability.Unstable
public static final Set S3A_OPENFILE_KEYS;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
index 4703d63567245..b841e8f786dc4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
@@ -35,8 +35,8 @@
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
-import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
@@ -68,6 +68,7 @@ public class OpenFileSupport {
private static final Logger LOG =
LoggerFactory.getLogger(OpenFileSupport.class);
+ public static final LogExactlyOnce LOG_NO_SQL_SELECT = new LogExactlyOnce(LOG);
/**
* For use when a value of an split/file length is unknown.
*/
@@ -153,12 +154,14 @@ public S3AReadOpContext applyDefaultOptions(S3AReadOpContext roc) {
/**
* Prepare to open a file from the openFile parameters.
+ * S3Select SQL is rejected if a mandatory opt, ignored if optional.
* @param path path to the file
* @param parameters open file parameters from the builder.
* @param blockSize for fileStatus
* @return open file options
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
+ * @throws UnsupportedOperationException for S3 Select options.
*/
@SuppressWarnings("ChainOfInstanceofChecks")
public OpenFileInformation prepareToOpenFile(
@@ -167,21 +170,21 @@ public OpenFileInformation prepareToOpenFile(
final long blockSize) throws IOException {
Configuration options = parameters.getOptions();
Set mandatoryKeys = parameters.getMandatoryKeys();
- String sql = options.get(SelectConstants.SELECT_SQL, null);
- boolean isSelect = sql != null;
- // choice of keys depends on open type
- if (isSelect) {
- // S3 Select call adds a large set of supported mandatory keys
- rejectUnknownMandatoryKeys(
- mandatoryKeys,
- InternalSelectConstants.SELECT_OPTIONS,
- "for " + path + " in S3 Select operation");
- } else {
- rejectUnknownMandatoryKeys(
- mandatoryKeys,
- InternalConstants.S3A_OPENFILE_KEYS,
- "for " + path + " in non-select file I/O");
+ // S3 Select is not supported in this release
+ if (options.get(SelectConstants.SELECT_SQL, null) != null) {
+ if (mandatoryKeys.contains(SelectConstants.SELECT_SQL)) {
+ // mandatory option: fail with a specific message.
+ throw new UnsupportedOperationException(SelectConstants.SELECT_UNSUPPORTED);
+ } else {
+ // optional; log once and continue
+ LOG_NO_SQL_SELECT.warn(SelectConstants.SELECT_UNSUPPORTED);
+ }
}
+ // choice of keys depends on open type
+ rejectUnknownMandatoryKeys(
+ mandatoryKeys,
+ InternalConstants.S3A_OPENFILE_KEYS,
+ "for " + path + " in file I/O");
// where does a read end?
long fileLength = LENGTH_UNKNOWN;
@@ -281,8 +284,6 @@ public OpenFileInformation prepareToOpenFile(
}
return new OpenFileInformation()
- .withS3Select(isSelect)
- .withSql(sql)
.withAsyncDrainThreshold(
builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD,
defaultReadAhead))
@@ -329,7 +330,6 @@ private S3AFileStatus createStatus(Path path, long length, long blockSize) {
*/
public OpenFileInformation openSimpleFile(final int bufferSize) {
return new OpenFileInformation()
- .withS3Select(false)
.withAsyncDrainThreshold(defaultAsyncDrainThreshold)
.withBufferSize(bufferSize)
.withChangePolicy(changePolicy)
@@ -357,15 +357,9 @@ public String toString() {
*/
public static final class OpenFileInformation {
- /** Is this SQL? */
- private boolean isS3Select;
-
/** File status; may be null. */
private S3AFileStatus status;
- /** SQL string if this is a SQL select file. */
- private String sql;
-
/** Active input policy. */
private S3AInputPolicy inputPolicy;
@@ -415,18 +409,10 @@ public OpenFileInformation build() {
return this;
}
- public boolean isS3Select() {
- return isS3Select;
- }
-
public S3AFileStatus getStatus() {
return status;
}
- public String getSql() {
- return sql;
- }
-
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
@@ -454,9 +440,7 @@ public long getSplitEnd() {
@Override
public String toString() {
return "OpenFileInformation{" +
- "isSql=" + isS3Select +
- ", status=" + status +
- ", sql='" + sql + '\'' +
+ "status=" + status +
", inputPolicy=" + inputPolicy +
", changePolicy=" + changePolicy +
", readAheadRange=" + readAheadRange +
@@ -475,16 +459,6 @@ public long getFileLength() {
return fileLength;
}
- /**
- * Set builder value.
- * @param value new value
- * @return the builder
- */
- public OpenFileInformation withS3Select(final boolean value) {
- isS3Select = value;
- return this;
- }
-
/**
* Set builder value.
* @param value new value
@@ -495,16 +469,6 @@ public OpenFileInformation withStatus(final S3AFileStatus value) {
return this;
}
- /**
- * Set builder value.
- * @param value new value
- * @return the builder
- */
- public OpenFileInformation withSql(final String value) {
- sql = value;
- return this;
- }
-
/**
* Set builder value.
* @param value new value
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
index 9c88870633a35..5a5d537d7a65d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
@@ -69,7 +69,7 @@ S3ObjectAttributes createObjectAttributes(
* Create the read context for reading from the referenced file,
* using FS state as well as the status.
* @param fileStatus file status.
- * @return a context for read and select operations.
+ * @return a context for read operations.
*/
S3AReadOpContext createReadContext(
FileStatus fileStatus);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index 17a7189ae220d..c91324da7cb15 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -43,7 +43,6 @@
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
@@ -585,20 +584,6 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder(
return prepareRequest(builder);
}
- @Override
- public SelectObjectContentRequest.Builder newSelectRequestBuilder(String key) {
- SelectObjectContentRequest.Builder requestBuilder =
- SelectObjectContentRequest.builder().bucket(bucket).key(key);
-
- EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets).ifPresent(base64customerKey -> {
- requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
- .sseCustomerKey(base64customerKey)
- .sseCustomerKeyMD5(Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
- });
-
- return prepareRequest(requestBuilder);
- }
-
@Override
public ListObjectsRequest.Builder newListObjectsV1RequestBuilder(
final String key,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 26b6acda30906..51bff4228be0f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -57,7 +57,7 @@
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
-import org.apache.hadoop.fs.s3a.select.SelectTool;
+import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.tools.BucketTool;
import org.apache.hadoop.fs.s3a.tools.MarkerTool;
import org.apache.hadoop.fs.shell.CommandFormat;
@@ -76,6 +76,7 @@
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.FILESYSTEM_TEMP_PATH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.S3A_DYNAMIC_CAPABILITIES;
+import static org.apache.hadoop.fs.s3a.select.SelectConstants.SELECT_UNSUPPORTED;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_ABORTED;
@@ -121,7 +122,6 @@ public abstract class S3GuardTool extends Configured implements Tool,
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
"\t" + BucketTool.NAME + " - " + BucketTool.PURPOSE + "\n" +
"\t" + MarkerTool.MARKERS + " - " + MarkerTool.PURPOSE + "\n" +
- "\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n" +
"\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n";
private static final String E_UNSUPPORTED = "This command is no longer supported";
@@ -1004,11 +1004,9 @@ public static int run(Configuration conf, String... args) throws
case Uploads.NAME:
command = new Uploads(conf);
break;
- case SelectTool.NAME:
- // the select tool is not technically a S3Guard tool, but it's on the CLI
- // because this is the defacto S3 CLI.
- command = new SelectTool(conf);
- break;
+ case SelectConstants.NAME:
+ throw new ExitUtil.ExitException(
+ EXIT_UNSUPPORTED_VERSION, SELECT_UNSUPPORTED);
default:
printHelp();
throw new ExitUtil.ExitException(E_USAGE,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/BlockingEnumeration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/BlockingEnumeration.java
deleted file mode 100644
index 42000f1017259..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/BlockingEnumeration.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.util.Enumeration;
-import java.util.NoSuchElementException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import software.amazon.awssdk.core.async.SdkPublisher;
-import software.amazon.awssdk.core.exception.SdkException;
-
-/**
- * Implements the {@link Enumeration} interface by subscribing to a
- * {@link SdkPublisher} instance. The enumeration will buffer a fixed
- * number of elements and only request new ones from the publisher
- * when they are consumed. Calls to {@link #hasMoreElements()} and
- * {@link #nextElement()} may block while waiting for new elements.
- * @param the type of element.
- */
-public final class BlockingEnumeration implements Enumeration {
- private static final class Signal {
- private final T element;
- private final Throwable error;
-
- Signal(T element) {
- this.element = element;
- this.error = null;
- }
-
- Signal(Throwable error) {
- this.element = null;
- this.error = error;
- }
- }
-
- private final Signal endSignal = new Signal<>((Throwable)null);
- private final CompletableFuture subscription = new CompletableFuture<>();
- private final BlockingQueue> signalQueue;
- private final int bufferSize;
- private Signal current = null;
-
- /**
- * Create an enumeration with a fixed buffer size and an
- * optional injected first element.
- * @param publisher the publisher feeding the enumeration.
- * @param bufferSize the buffer size.
- * @param firstElement (optional) first element the enumeration will return.
- */
- public BlockingEnumeration(SdkPublisher publisher,
- final int bufferSize,
- final T firstElement) {
- this.signalQueue = new LinkedBlockingQueue<>();
- this.bufferSize = bufferSize;
- if (firstElement != null) {
- this.current = new Signal<>(firstElement);
- }
- publisher.subscribe(new EnumerationSubscriber());
- }
-
- /**
- * Create an enumeration with a fixed buffer size.
- * @param publisher the publisher feeding the enumeration.
- * @param bufferSize the buffer size.
- */
- public BlockingEnumeration(SdkPublisher publisher,
- final int bufferSize) {
- this(publisher, bufferSize, null);
- }
-
- @Override
- public boolean hasMoreElements() {
- if (current == null) {
- try {
- current = signalQueue.take();
- } catch (InterruptedException e) {
- current = new Signal<>(e);
- subscription.thenAccept(Subscription::cancel);
- Thread.currentThread().interrupt();
- }
- }
- if (current.error != null) {
- Throwable error = current.error;
- current = endSignal;
- if (error instanceof Error) {
- throw (Error)error;
- } else if (error instanceof SdkException) {
- throw (SdkException)error;
- } else {
- throw SdkException.create("Unexpected error", error);
- }
- }
- return current != endSignal;
- }
-
- @Override
- public T nextElement() {
- if (!hasMoreElements()) {
- throw new NoSuchElementException();
- }
- T element = current.element;
- current = null;
- subscription.thenAccept(s -> s.request(1));
- return element;
- }
-
- private final class EnumerationSubscriber implements Subscriber {
-
- @Override
- public void onSubscribe(Subscription s) {
- long request = bufferSize;
- if (current != null) {
- request--;
- }
- if (request > 0) {
- s.request(request);
- }
- subscription.complete(s);
- }
-
- @Override
- public void onNext(T t) {
- signalQueue.add(new Signal<>(t));
- }
-
- @Override
- public void onError(Throwable t) {
- signalQueue.add(new Signal<>(t));
- }
-
- @Override
- public void onComplete() {
- signalQueue.add(endSignal);
- }
- }
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
deleted file mode 100644
index fbf5226afb82f..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.s3a.impl.InternalConstants;
-
-import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
-
-/**
- * Constants for internal use in the org.apache.hadoop.fs.s3a module itself.
- * Please don't refer to these outside of this module & its tests.
- * If you find you need to then either the code is doing something it
- * should not, or these constants need to be uprated to being
- * public and stable entries.
- */
-@InterfaceAudience.Private
-public final class InternalSelectConstants {
-
- private InternalSelectConstants() {
- }
-
- /**
- * An unmodifiable set listing the options
- * supported in {@code openFile()}.
- */
- public static final Set SELECT_OPTIONS;
-
- /*
- * Build up the options, pulling in the standard set too.
- */
- static {
- // when adding to this, please keep in alphabetical order after the
- // common options and the SQL.
- HashSet options = new HashSet<>(Arrays.asList(
- SELECT_SQL,
- SELECT_ERRORS_INCLUDE_SQL,
- SELECT_INPUT_COMPRESSION,
- SELECT_INPUT_FORMAT,
- SELECT_OUTPUT_FORMAT,
- CSV_INPUT_COMMENT_MARKER,
- CSV_INPUT_HEADER,
- CSV_INPUT_INPUT_FIELD_DELIMITER,
- CSV_INPUT_QUOTE_CHARACTER,
- CSV_INPUT_QUOTE_ESCAPE_CHARACTER,
- CSV_INPUT_RECORD_DELIMITER,
- CSV_OUTPUT_FIELD_DELIMITER,
- CSV_OUTPUT_QUOTE_CHARACTER,
- CSV_OUTPUT_QUOTE_ESCAPE_CHARACTER,
- CSV_OUTPUT_QUOTE_FIELDS,
- CSV_OUTPUT_RECORD_DELIMITER
- ));
- options.addAll(InternalConstants.S3A_OPENFILE_KEYS);
- SELECT_OPTIONS = Collections.unmodifiableSet(options);
- }
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectBinding.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectBinding.java
deleted file mode 100644
index c3b8abbc2ea88..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectBinding.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.io.IOException;
-import java.util.Locale;
-
-import software.amazon.awssdk.services.s3.model.CSVInput;
-import software.amazon.awssdk.services.s3.model.CSVOutput;
-import software.amazon.awssdk.services.s3.model.ExpressionType;
-import software.amazon.awssdk.services.s3.model.InputSerialization;
-import software.amazon.awssdk.services.s3.model.OutputSerialization;
-import software.amazon.awssdk.services.s3.model.QuoteFields;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
-import org.apache.hadoop.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIOException;
-import org.apache.hadoop.fs.s3a.Retries;
-import org.apache.hadoop.fs.s3a.S3AReadOpContext;
-import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
-import org.apache.hadoop.fs.s3a.WriteOperationHelper;
-
-import static org.apache.hadoop.util.Preconditions.checkNotNull;
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
-import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
-
-/**
- * Class to do the S3 select binding and build a select request from the
- * supplied arguments/configuration.
- *
- * This class is intended to be instantiated by the owning S3AFileSystem
- * instance to handle the construction of requests: IO is still done exclusively
- * in the filesystem.
- *
- */
-public class SelectBinding {
-
- static final Logger LOG =
- LoggerFactory.getLogger(SelectBinding.class);
-
- /** Operations on the store. */
- private final WriteOperationHelper operations;
-
- /** Is S3 Select enabled? */
- private final boolean enabled;
- private final boolean errorsIncludeSql;
-
- /**
- * Constructor.
- * @param operations callback to owner FS, with associated span.
- */
- public SelectBinding(final WriteOperationHelper operations) {
- this.operations = checkNotNull(operations);
- Configuration conf = getConf();
- this.enabled = isSelectEnabled(conf);
- this.errorsIncludeSql = conf.getBoolean(SELECT_ERRORS_INCLUDE_SQL, false);
- }
-
- Configuration getConf() {
- return operations.getConf();
- }
-
- /**
- * Is the service supported?
- * @return true iff select is enabled.
- */
- public boolean isEnabled() {
- return enabled;
- }
-
- /**
- * Static probe for select being enabled.
- * @param conf configuration
- * @return true iff select is enabled.
- */
- public static boolean isSelectEnabled(Configuration conf) {
- return conf.getBoolean(FS_S3A_SELECT_ENABLED, true);
- }
-
- /**
- * Build and execute a select request.
- * @param readContext the read context, which includes the source path.
- * @param expression the SQL expression.
- * @param builderOptions query options
- * @param objectAttributes object attributes from a HEAD request
- * @return an FSDataInputStream whose wrapped stream is a SelectInputStream
- * @throws IllegalArgumentException argument failure
- * @throws IOException failure building, validating or executing the request.
- * @throws PathIOException source path is a directory.
- */
- @Retries.RetryTranslated
- public FSDataInputStream select(
- final S3AReadOpContext readContext,
- final String expression,
- final Configuration builderOptions,
- final S3ObjectAttributes objectAttributes) throws IOException {
-
- return new FSDataInputStream(
- executeSelect(readContext,
- objectAttributes,
- builderOptions,
- buildSelectRequest(
- readContext.getPath(),
- expression,
- builderOptions
- )));
- }
-
- /**
- * Build a select request.
- * @param path source path.
- * @param expression the SQL expression.
- * @param builderOptions config to extract other query options from
- * @return the request to serve
- * @throws IllegalArgumentException argument failure
- * @throws IOException problem building/validating the request
- */
- public SelectObjectContentRequest buildSelectRequest(
- final Path path,
- final String expression,
- final Configuration builderOptions)
- throws IOException {
- Preconditions.checkState(isEnabled(),
- "S3 Select is not enabled for %s", path);
-
- SelectObjectContentRequest.Builder request = operations.newSelectRequestBuilder(path);
- buildRequest(request, expression, builderOptions);
- return request.build();
- }
-
- /**
- * Execute the select request.
- * @param readContext read context
- * @param objectAttributes object attributes from a HEAD request
- * @param builderOptions the options which came in from the openFile builder.
- * @param request the built up select request.
- * @return a SelectInputStream
- * @throws IOException failure
- * @throws PathIOException source path is a directory.
- */
- @Retries.RetryTranslated
- private SelectInputStream executeSelect(
- final S3AReadOpContext readContext,
- final S3ObjectAttributes objectAttributes,
- final Configuration builderOptions,
- final SelectObjectContentRequest request) throws IOException {
-
- Path path = readContext.getPath();
- if (readContext.getDstFileStatus().isDirectory()) {
- throw new PathIOException(path.toString(),
- "Can't select " + path
- + " because it is a directory");
- }
- boolean sqlInErrors = builderOptions.getBoolean(SELECT_ERRORS_INCLUDE_SQL,
- errorsIncludeSql);
- String expression = request.expression();
- final String errorText = sqlInErrors ? expression : "Select";
- if (sqlInErrors) {
- LOG.info("Issuing SQL request {}", expression);
- }
- SelectEventStreamPublisher selectPublisher = operations.select(path, request, errorText);
- return new SelectInputStream(readContext,
- objectAttributes, selectPublisher);
- }
-
- /**
- * Build the select request from the configuration built up
- * in {@code S3AFileSystem.openFile(Path)} and the default
- * options in the cluster configuration.
- *
- * Options are picked up in the following order.
- *
- *
Options in {@code openFileOptions}.
- *
Options in the owning filesystem configuration.
- *
The default values in {@link SelectConstants}
- *
- *
- * @param requestBuilder request to build up
- * @param expression SQL expression
- * @param builderOptions the options which came in from the openFile builder.
- * @throws IllegalArgumentException if an option is somehow invalid.
- * @throws IOException if an option is somehow invalid.
- */
- void buildRequest(
- final SelectObjectContentRequest.Builder requestBuilder,
- final String expression,
- final Configuration builderOptions)
- throws IllegalArgumentException, IOException {
- Preconditions.checkArgument(StringUtils.isNotEmpty(expression),
- "No expression provided in parameter " + SELECT_SQL);
-
- final Configuration ownerConf = operations.getConf();
-
- String inputFormat = builderOptions.get(SELECT_INPUT_FORMAT,
- SELECT_FORMAT_CSV).toLowerCase(Locale.ENGLISH);
- Preconditions.checkArgument(SELECT_FORMAT_CSV.equals(inputFormat),
- "Unsupported input format %s", inputFormat);
- String outputFormat = builderOptions.get(SELECT_OUTPUT_FORMAT,
- SELECT_FORMAT_CSV)
- .toLowerCase(Locale.ENGLISH);
- Preconditions.checkArgument(SELECT_FORMAT_CSV.equals(outputFormat),
- "Unsupported output format %s", outputFormat);
-
- requestBuilder.expressionType(ExpressionType.SQL);
- requestBuilder.expression(expandBackslashChars(expression));
-
- requestBuilder.inputSerialization(
- buildCsvInput(ownerConf, builderOptions));
- requestBuilder.outputSerialization(
- buildCSVOutput(ownerConf, builderOptions));
- }
-
- /**
- * Build the CSV input format for a request.
- * @param ownerConf FS owner configuration
- * @param builderOptions options on the specific request
- * @return the input format
- * @throws IllegalArgumentException argument failure
- * @throws IOException validation failure
- */
- public InputSerialization buildCsvInput(
- final Configuration ownerConf,
- final Configuration builderOptions)
- throws IllegalArgumentException, IOException {
-
- String headerInfo = opt(builderOptions,
- ownerConf,
- CSV_INPUT_HEADER,
- CSV_INPUT_HEADER_OPT_DEFAULT,
- true).toUpperCase(Locale.ENGLISH);
- String commentMarker = xopt(builderOptions,
- ownerConf,
- CSV_INPUT_COMMENT_MARKER,
- CSV_INPUT_COMMENT_MARKER_DEFAULT);
- String fieldDelimiter = xopt(builderOptions,
- ownerConf,
- CSV_INPUT_INPUT_FIELD_DELIMITER,
- CSV_INPUT_FIELD_DELIMITER_DEFAULT);
- String recordDelimiter = xopt(builderOptions,
- ownerConf,
- CSV_INPUT_RECORD_DELIMITER,
- CSV_INPUT_RECORD_DELIMITER_DEFAULT);
- String quoteCharacter = xopt(builderOptions,
- ownerConf,
- CSV_INPUT_QUOTE_CHARACTER,
- CSV_INPUT_QUOTE_CHARACTER_DEFAULT);
- String quoteEscapeCharacter = xopt(builderOptions,
- ownerConf,
- CSV_INPUT_QUOTE_ESCAPE_CHARACTER,
- CSV_INPUT_QUOTE_ESCAPE_CHARACTER_DEFAULT);
-
- // CSV input
- CSVInput.Builder csvBuilder = CSVInput.builder()
- .fieldDelimiter(fieldDelimiter)
- .recordDelimiter(recordDelimiter)
- .comments(commentMarker)
- .quoteCharacter(quoteCharacter);
- if (StringUtils.isNotEmpty(quoteEscapeCharacter)) {
- csvBuilder.quoteEscapeCharacter(quoteEscapeCharacter);
- }
- csvBuilder.fileHeaderInfo(headerInfo);
-
- InputSerialization.Builder inputSerialization =
- InputSerialization.builder()
- .csv(csvBuilder.build());
- String compression = opt(builderOptions,
- ownerConf,
- SELECT_INPUT_COMPRESSION,
- COMPRESSION_OPT_NONE,
- true).toUpperCase(Locale.ENGLISH);
- if (isNotEmpty(compression)) {
- inputSerialization.compressionType(compression);
- }
- return inputSerialization.build();
- }
-
- /**
- * Build CSV output format for a request.
- * @param ownerConf FS owner configuration
- * @param builderOptions options on the specific request
- * @return the output format
- * @throws IllegalArgumentException argument failure
- * @throws IOException validation failure
- */
- public OutputSerialization buildCSVOutput(
- final Configuration ownerConf,
- final Configuration builderOptions)
- throws IllegalArgumentException, IOException {
- String fieldDelimiter = xopt(builderOptions,
- ownerConf,
- CSV_OUTPUT_FIELD_DELIMITER,
- CSV_OUTPUT_FIELD_DELIMITER_DEFAULT);
- String recordDelimiter = xopt(builderOptions,
- ownerConf,
- CSV_OUTPUT_RECORD_DELIMITER,
- CSV_OUTPUT_RECORD_DELIMITER_DEFAULT);
- String quoteCharacter = xopt(builderOptions,
- ownerConf,
- CSV_OUTPUT_QUOTE_CHARACTER,
- CSV_OUTPUT_QUOTE_CHARACTER_DEFAULT);
- String quoteEscapeCharacter = xopt(builderOptions,
- ownerConf,
- CSV_OUTPUT_QUOTE_ESCAPE_CHARACTER,
- CSV_OUTPUT_QUOTE_ESCAPE_CHARACTER_DEFAULT);
- String quoteFields = xopt(builderOptions,
- ownerConf,
- CSV_OUTPUT_QUOTE_FIELDS,
- CSV_OUTPUT_QUOTE_FIELDS_ALWAYS).toUpperCase(Locale.ENGLISH);
-
- CSVOutput.Builder csvOutputBuilder = CSVOutput.builder()
- .quoteCharacter(quoteCharacter)
- .quoteFields(QuoteFields.fromValue(quoteFields))
- .fieldDelimiter(fieldDelimiter)
- .recordDelimiter(recordDelimiter);
- if (!quoteEscapeCharacter.isEmpty()) {
- csvOutputBuilder.quoteEscapeCharacter(quoteEscapeCharacter);
- }
-
- // output is CSV, always
- return OutputSerialization.builder()
- .csv(csvOutputBuilder.build())
- .build();
- }
-
- /**
- * Stringify the given SelectObjectContentRequest, as its
- * toString() operator doesn't.
- * @param request request to convert to a string
- * @return a string to print. Does not contain secrets.
- */
- public static String toString(final SelectObjectContentRequest request) {
- StringBuilder sb = new StringBuilder();
- sb.append("SelectObjectContentRequest{")
- .append("bucket name=").append(request.bucket())
- .append("; key=").append(request.key())
- .append("; expressionType=").append(request.expressionType())
- .append("; expression=").append(request.expression());
- InputSerialization input = request.inputSerialization();
- if (input != null) {
- sb.append("; Input")
- .append(input.toString());
- } else {
- sb.append("; Input Serialization: none");
- }
- OutputSerialization out = request.outputSerialization();
- if (out != null) {
- sb.append("; Output")
- .append(out.toString());
- } else {
- sb.append("; Output Serialization: none");
- }
- return sb.append("}").toString();
- }
-
- /**
- * Resolve an option.
- * @param builderOptions the options which came in from the openFile builder.
- * @param fsConf configuration of the owning FS.
- * @param base base option (no s3a: prefix)
- * @param defVal default value. Must not be null.
- * @param trim should the result be trimmed.
- * @return the possibly trimmed value.
- */
- static String opt(Configuration builderOptions,
- Configuration fsConf,
- String base,
- String defVal,
- boolean trim) {
- String r = builderOptions.get(base, fsConf.get(base, defVal));
- return trim ? r.trim() : r;
- }
-
- /**
- * Get an option with backslash arguments transformed.
- * These are not trimmed, so whitespace is significant.
- * @param selectOpts options in the select call
- * @param fsConf filesystem conf
- * @param base base option name
- * @param defVal default value
- * @return the transformed value
- */
- static String xopt(Configuration selectOpts,
- Configuration fsConf,
- String base,
- String defVal) {
- return expandBackslashChars(
- opt(selectOpts, fsConf, base, defVal, false));
- }
-
- /**
- * Perform escaping.
- * @param src source string.
- * @return the replaced value
- */
- static String expandBackslashChars(String src) {
- return src.replace("\\n", "\n")
- .replace("\\\"", "\"")
- .replace("\\t", "\t")
- .replace("\\r", "\r")
- .replace("\\\"", "\"")
- // backslash substitution must come last
- .replace("\\\\", "\\");
- }
-
-
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectConstants.java
index 0e2bf914f83c5..d1c977f92824d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectConstants.java
@@ -25,13 +25,19 @@
* Options related to S3 Select.
*
* These options are set for the entire filesystem unless overridden
- * as an option in the URI
+ * as an option in the URI.
+ *
+ * The S3 Select API is no longer supported -however this class is retained
+ * so that any application which imports the dependencies will still link.
*/
@InterfaceAudience.Public
-@InterfaceStability.Unstable
+@InterfaceStability.Stable
+@Deprecated
public final class SelectConstants {
- public static final String SELECT_UNSUPPORTED = "S3 Select is not supported";
+ public static final String SELECT_UNSUPPORTED = "S3 Select is no longer supported";
+
+ public static final String NAME = "select";
private SelectConstants() {
}
@@ -41,13 +47,18 @@ private SelectConstants() {
/**
* This is the big SQL expression: {@value}.
- * When used in an open() call, switch to a select operation.
- * This is only used in the open call, never in a filesystem configuration.
+ * When used in an open() call:
+ *
+ *
if the option is set in a {@code .may()} clause: warn and continue
+ *
if the option is set in a {@code .must()} clause:
+ * {@code UnsupportedOperationException}.
+ *
*/
public static final String SELECT_SQL = FS_S3A_SELECT + "sql";
/**
* Does the FS Support S3 Select?
+ * This is false everywhere.
* Value: {@value}.
*/
public static final String S3_SELECT_CAPABILITY = "fs.s3a.capability.select.sql";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectEventStreamPublisher.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectEventStreamPublisher.java
deleted file mode 100644
index c71ea5f1623a1..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectEventStreamPublisher.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.SequenceInputStream;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-import org.reactivestreams.Subscriber;
-
-import software.amazon.awssdk.core.async.SdkPublisher;
-import software.amazon.awssdk.http.AbortableInputStream;
-import software.amazon.awssdk.services.s3.model.EndEvent;
-import software.amazon.awssdk.services.s3.model.RecordsEvent;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
-import software.amazon.awssdk.utils.ToString;
-
-/**
- * Async publisher of {@link SelectObjectContentEventStream}s returned
- * from a SelectObjectContent call.
- */
-public final class SelectEventStreamPublisher implements
- SdkPublisher {
-
- private final CompletableFuture selectOperationFuture;
- private final SelectObjectContentResponse response;
- private final SdkPublisher publisher;
-
- /**
- * Create the publisher.
- * @param selectOperationFuture SelectObjectContent future
- * @param response SelectObjectContent response
- * @param publisher SelectObjectContentEventStream publisher to wrap
- */
- public SelectEventStreamPublisher(
- CompletableFuture selectOperationFuture,
- SelectObjectContentResponse response,
- SdkPublisher publisher) {
- this.selectOperationFuture = selectOperationFuture;
- this.response = response;
- this.publisher = publisher;
- }
-
- /**
- * Retrieve an input stream to the subset of the S3 object that matched the select query.
- * This is equivalent to loading the content of all RecordsEvents into an InputStream.
- * This will lazily-load the content from S3, minimizing the amount of memory used.
- * @param onEndEvent callback on the end event
- * @return the input stream
- */
- public AbortableInputStream toRecordsInputStream(Consumer onEndEvent) {
- SdkPublisher recordInputStreams = this.publisher
- .filter(e -> {
- if (e instanceof RecordsEvent) {
- return true;
- } else if (e instanceof EndEvent) {
- onEndEvent.accept((EndEvent) e);
- }
- return false;
- })
- .map(e -> ((RecordsEvent) e).payload().asInputStream());
-
- // Subscribe to the async publisher using an enumeration that will
- // buffer a single chunk (RecordsEvent's payload) at a time and
- // block until it is consumed.
- // Also inject an empty stream as the first element that
- // SequenceInputStream will request on construction.
- BlockingEnumeration enumeration =
- new BlockingEnumeration(recordInputStreams, 1, EMPTY_STREAM);
- return AbortableInputStream.create(
- new SequenceInputStream(enumeration),
- this::cancel);
- }
-
- /**
- * The response from the SelectObjectContent call.
- * @return the response object
- */
- public SelectObjectContentResponse response() {
- return response;
- }
-
- @Override
- public void subscribe(Subscriber super SelectObjectContentEventStream> subscriber) {
- publisher.subscribe(subscriber);
- }
-
- /**
- * Cancel the operation.
- */
- public void cancel() {
- selectOperationFuture.cancel(true);
- }
-
- @Override
- public String toString() {
- return ToString.builder("SelectObjectContentEventStream")
- .add("response", response)
- .add("publisher", publisher)
- .build();
- }
-
- private static final InputStream EMPTY_STREAM =
- new ByteArrayInputStream(new byte[0]);
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectInputStream.java
deleted file mode 100644
index 3586d83a0a434..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectInputStream.java
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import software.amazon.awssdk.core.exception.AbortedException;
-import software.amazon.awssdk.http.AbortableInputStream;
-import org.apache.hadoop.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.CanSetReadahead;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.PathIOException;
-import org.apache.hadoop.fs.s3a.Retries;
-import org.apache.hadoop.fs.s3a.S3AReadOpContext;
-import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
-import org.apache.hadoop.io.IOUtils;
-
-
-import static org.apache.hadoop.util.Preconditions.checkNotNull;
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
-import static org.apache.hadoop.fs.s3a.Invoker.once;
-import static org.apache.hadoop.fs.s3a.S3AInputStream.validateReadahead;
-
-/**
- * An input stream for S3 Select return values.
- * This is simply an end-to-end GET request, without any
- * form of seek or recovery from connectivity failures.
- *
- * Currently only seek and positioned read operations on the current
- * location are supported.
- *
- * The normal S3 input counters are updated by this stream.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class SelectInputStream extends FSInputStream implements
- CanSetReadahead {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(SelectInputStream.class);
-
- public static final String SEEK_UNSUPPORTED = "seek()";
-
- /**
- * Same set of arguments as for an S3AInputStream.
- */
- private final S3ObjectAttributes objectAttributes;
-
- /**
- * Tracks the current position.
- */
- private AtomicLong pos = new AtomicLong(0);
-
- /**
- * Closed flag.
- */
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- /**
- * Did the read complete successfully?
- */
- private final AtomicBoolean completedSuccessfully = new AtomicBoolean(false);
-
- /**
- * Abortable response stream.
- * This is guaranteed to never be null.
- */
- private final AbortableInputStream wrappedStream;
-
- private final String bucket;
-
- private final String key;
-
- private final String uri;
-
- private final S3AReadOpContext readContext;
-
- private final S3AInputStreamStatistics streamStatistics;
-
- private long readahead;
-
- /**
- * Create the stream.
- * The read attempt is initiated immediately.
- * @param readContext read context
- * @param objectAttributes object attributes from a HEAD request
- * @param selectPublisher event stream publisher from the already executed call
- * @throws IOException failure
- */
- @Retries.OnceTranslated
- public SelectInputStream(
- final S3AReadOpContext readContext,
- final S3ObjectAttributes objectAttributes,
- final SelectEventStreamPublisher selectPublisher) throws IOException {
- Preconditions.checkArgument(isNotEmpty(objectAttributes.getBucket()),
- "No Bucket");
- Preconditions.checkArgument(isNotEmpty(objectAttributes.getKey()),
- "No Key");
- this.objectAttributes = objectAttributes;
- this.bucket = objectAttributes.getBucket();
- this.key = objectAttributes.getKey();
- this.uri = "s3a://" + this.bucket + "/" + this.key;
- this.readContext = readContext;
- this.readahead = readContext.getReadahead();
- this.streamStatistics = readContext.getS3AStatisticsContext()
- .newInputStreamStatistics();
-
- AbortableInputStream stream = once(
- "S3 Select",
- uri,
- () -> {
- return selectPublisher.toRecordsInputStream(e -> {
- LOG.debug("Completed successful S3 select read from {}", uri);
- completedSuccessfully.set(true);
- });
- });
-
- this.wrappedStream = checkNotNull(stream);
- // this stream is already opened, so mark as such in the statistics.
- streamStatistics.streamOpened();
- }
-
- @Override
- public void close() throws IOException {
- long skipped = 0;
- boolean aborted = false;
- if (!closed.getAndSet(true)) {
- try {
- // set up for aborts.
- // if we know the available amount > readahead. Abort.
- //
- boolean shouldAbort = wrappedStream.available() > readahead;
- if (!shouldAbort) {
- // read our readahead range worth of data
- skipped = wrappedStream.skip(readahead);
- shouldAbort = wrappedStream.read() >= 0;
- }
- // now, either there is data left or not.
- if (shouldAbort) {
- // yes, more data. Abort and add this fact to the stream stats
- aborted = true;
- wrappedStream.abort();
- }
- } catch (IOException | AbortedException e) {
- LOG.debug("While closing stream", e);
- } finally {
- IOUtils.cleanupWithLogger(LOG, wrappedStream);
- streamStatistics.streamClose(aborted, skipped);
- streamStatistics.close();
- super.close();
- }
- }
- }
-
- /**
- * Verify that the input stream is open. Non blocking; this gives
- * the last state of the atomic {@link #closed} field.
- * @throws PathIOException if the connection is closed.
- */
- private void checkNotClosed() throws IOException {
- if (closed.get()) {
- throw new PathIOException(uri, FSExceptionMessages.STREAM_IS_CLOSED);
- }
- }
-
- @Override
- public int available() throws IOException {
- checkNotClosed();
- return wrappedStream.available();
- }
-
- @Override
- @Retries.OnceTranslated
- public synchronized long skip(final long n) throws IOException {
- checkNotClosed();
- long skipped = once("skip", uri, () -> wrappedStream.skip(n));
- pos.addAndGet(skipped);
- // treat as a forward skip for stats
- streamStatistics.seekForwards(skipped, skipped);
- return skipped;
- }
-
- @Override
- public long getPos() {
- return pos.get();
- }
-
- /**
- * Set the readahead.
- * @param readahead The readahead to use. null means to use the default.
- */
- @Override
- public void setReadahead(Long readahead) {
- this.readahead = validateReadahead(readahead);
- }
-
- /**
- * Get the current readahead value.
- * @return the readahead
- */
- public long getReadahead() {
- return readahead;
- }
-
- /**
- * Read a byte. There's no attempt to recover, but AWS-SDK exceptions
- * such as {@code SelectObjectContentEventException} are translated into
- * IOExceptions.
- * @return a byte read or -1 for an end of file.
- * @throws IOException failure.
- */
- @Override
- @Retries.OnceTranslated
- public synchronized int read() throws IOException {
- checkNotClosed();
- int byteRead;
- try {
- byteRead = once("read()", uri, () -> wrappedStream.read());
- } catch (EOFException e) {
- // this could be one of: end of file, some IO failure
- if (completedSuccessfully.get()) {
- // read was successful
- return -1;
- } else {
- // the stream closed prematurely
- LOG.info("Reading of S3 Select data from {} failed before all results "
- + " were generated.", uri);
- streamStatistics.readException();
- throw new PathIOException(uri,
- "Read of S3 Select data did not complete");
- }
- }
-
- if (byteRead >= 0) {
- incrementBytesRead(1);
- }
- return byteRead;
- }
-
- @SuppressWarnings("NullableProblems")
- @Override
- @Retries.OnceTranslated
- public synchronized int read(final byte[] buf, final int off, final int len)
- throws IOException {
- checkNotClosed();
- validatePositionedReadArgs(pos.get(), buf, off, len);
- if (len == 0) {
- return 0;
- }
-
- int bytesRead;
- try {
- streamStatistics.readOperationStarted(pos.get(), len);
- bytesRead = wrappedStream.read(buf, off, len);
- } catch (EOFException e) {
- streamStatistics.readException();
- // the base implementation swallows EOFs.
- return -1;
- }
-
- incrementBytesRead(bytesRead);
- streamStatistics.readOperationCompleted(len, bytesRead);
- return bytesRead;
- }
-
- /**
- * Forward seeks are supported, but not backwards ones.
- * Forward seeks are implemented using read, so
- * means that long-distance seeks will be (literally) expensive.
- *
- * @param newPos new seek position.
- * @throws PathIOException Backwards seek attempted.
- * @throws EOFException attempt to seek past the end of the stream.
- * @throws IOException IO failure while skipping bytes
- */
- @Override
- @Retries.OnceTranslated
- public synchronized void seek(long newPos) throws IOException {
- long current = getPos();
- long distance = newPos - current;
- if (distance < 0) {
- throw unsupported(SEEK_UNSUPPORTED
- + " backwards from " + current + " to " + newPos);
- }
- if (distance == 0) {
- LOG.debug("ignoring seek to current position.");
- } else {
- // the complicated one: Forward seeking. Useful for split files.
- LOG.debug("Forward seek by reading {} bytes", distance);
- long bytesSkipped = 0;
- // read byte-by-byte, hoping that buffering will compensate for this.
- // doing it this way ensures that the seek stops at exactly the right
- // place. skip(len) can return a smaller value, at which point
- // it's not clear what to do.
- while(distance > 0) {
- int r = read();
- if (r == -1) {
- // reached an EOF too early
- throw new EOFException("Seek to " + newPos
- + " reached End of File at offset " + getPos());
- }
- distance--;
- bytesSkipped++;
- }
- // read has finished.
- streamStatistics.seekForwards(bytesSkipped, bytesSkipped);
- }
- }
-
- /**
- * Build an exception to raise when an operation is not supported here.
- * @param action action which is Unsupported.
- * @return an exception to throw.
- */
- protected PathIOException unsupported(final String action) {
- return new PathIOException(
- String.format("s3a://%s/%s", bucket, key),
- action + " not supported");
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
-
- // Not supported.
- @Override
- public boolean markSupported() {
- return false;
- }
-
- @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override
- public void mark(int readLimit) {
- // Do nothing
- }
-
- @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override
- public void reset() throws IOException {
- throw unsupported("Mark");
- }
-
- /**
- * Aborts the IO.
- */
- public void abort() {
- if (!closed.get()) {
- LOG.debug("Aborting");
- wrappedStream.abort();
- }
- }
-
- /**
- * Read at a specific position.
- * Reads at a position earlier than the current {@link #getPos()} position
- * will fail with a {@link PathIOException}. See {@link #seek(long)}.
- * Unlike the base implementation And the requirements of the filesystem
- * specification, this updates the stream position as returned in
- * {@link #getPos()}.
- * @param position offset in the stream.
- * @param buffer buffer to read in to.
- * @param offset offset within the buffer
- * @param length amount of data to read.
- * @return the result.
- * @throws PathIOException Backwards seek attempted.
- * @throws EOFException attempt to seek past the end of the stream.
- * @throws IOException IO failure while seeking in the stream or reading data.
- */
- @Override
- public int read(final long position,
- final byte[] buffer,
- final int offset,
- final int length)
- throws IOException {
- // maybe seek forwards to the position.
- seek(position);
- return read(buffer, offset, length);
- }
-
- /**
- * Increment the bytes read counter if there is a stats instance
- * and the number of bytes read is more than zero.
- * This also updates the {@link #pos} marker by the same value.
- * @param bytesRead number of bytes read
- */
- private void incrementBytesRead(long bytesRead) {
- if (bytesRead > 0) {
- pos.addAndGet(bytesRead);
- }
- streamStatistics.bytesRead(bytesRead);
- if (readContext.getStats() != null && bytesRead > 0) {
- readContext.getStats().incrementBytesRead(bytesRead);
- }
- }
-
- /**
- * Get the Stream statistics.
- * @return the statistics for this stream.
- */
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public S3AInputStreamStatistics getS3AStreamStatistics() {
- return streamStatistics;
- }
-
- /**
- * String value includes statistics as well as stream state.
- * Important: there are no guarantees as to the stability
- * of this value.
- * @return a string value for printing in logs/diagnostics
- */
- @Override
- @InterfaceStability.Unstable
- public String toString() {
- String s = streamStatistics.toString();
- synchronized (this) {
- final StringBuilder sb = new StringBuilder(
- "SelectInputStream{");
- sb.append(uri);
- sb.append("; state ").append(!closed.get() ? "open" : "closed");
- sb.append("; pos=").append(getPos());
- sb.append("; readahead=").append(readahead);
- sb.append('\n').append(s);
- sb.append('}');
- return sb.toString();
- }
- }
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectObjectContentHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectObjectContentHelper.java
deleted file mode 100644
index 8233e67eea0a5..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectObjectContentHelper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-
-import software.amazon.awssdk.core.async.SdkPublisher;
-import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
-import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.S3AUtils;
-
-import static org.apache.hadoop.fs.s3a.WriteOperationHelper.WriteOperationHelperCallbacks;
-
-/**
- * Helper for SelectObjectContent queries against an S3 Bucket.
- */
-public final class SelectObjectContentHelper {
-
- private SelectObjectContentHelper() {
- }
-
- /**
- * Execute an S3 Select operation.
- * @param writeOperationHelperCallbacks helper callbacks
- * @param source source for selection
- * @param request Select request to issue.
- * @param action the action for use in exception creation
- * @return the select response event stream publisher
- * @throws IOException on failure
- */
- public static SelectEventStreamPublisher select(
- WriteOperationHelperCallbacks writeOperationHelperCallbacks,
- Path source,
- SelectObjectContentRequest request,
- String action)
- throws IOException {
- try {
- Handler handler = new Handler();
- CompletableFuture selectOperationFuture =
- writeOperationHelperCallbacks.selectObjectContent(request, handler);
- return handler.eventPublisher(selectOperationFuture).join();
- } catch (Throwable e) {
- if (e instanceof CompletionException) {
- e = e.getCause();
- }
- IOException translated;
- if (e instanceof SdkException) {
- translated = S3AUtils.translateException(action, source,
- (SdkException)e);
- } else {
- translated = new IOException(e);
- }
- throw translated;
- }
- }
-
- private static class Handler implements SelectObjectContentResponseHandler {
- private volatile CompletableFuture>> responseAndPublisherFuture =
- new CompletableFuture<>();
-
- private volatile SelectObjectContentResponse response;
-
- public CompletableFuture eventPublisher(
- CompletableFuture selectOperationFuture) {
- return responseAndPublisherFuture.thenApply(p ->
- new SelectEventStreamPublisher(selectOperationFuture,
- p.getLeft(), p.getRight()));
- }
-
- @Override
- public void responseReceived(SelectObjectContentResponse selectObjectContentResponse) {
- this.response = selectObjectContentResponse;
- }
-
- @Override
- public void onEventStream(SdkPublisher publisher) {
- responseAndPublisherFuture.complete(Pair.of(response, publisher));
- }
-
- @Override
- public void exceptionOccurred(Throwable error) {
- responseAndPublisherFuture.completeExceptionally(error);
- }
-
- @Override
- public void complete() {
- }
- }
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java
deleted file mode 100644
index 7a6c1afdc1fc3..0000000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.select;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Locale;
-import java.util.Optional;
-import java.util.Scanner;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
-import org.apache.hadoop.fs.shell.CommandFormat;
-import org.apache.hadoop.util.DurationInfo;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.OperationDuration;
-import org.apache.hadoop.util.functional.FutureIO;
-
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
-import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
-import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
-import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
-
-/**
- * This is a CLI tool for the select operation, which is available
- * through the S3Guard command.
- *
- * Usage:
- *