Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,11 +1121,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.eventstream</groupId>
<artifactId>eventstream</artifactId>
<version>${aws.eventstream.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,6 @@
<artifactId>bundle</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.eventstream</groupId>
<artifactId>eventstream</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
Expand Down Expand Up @@ -194,8 +192,6 @@
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
Expand Down Expand Up @@ -299,7 +295,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

private S3Client s3Client;

/** Async client is used for transfer manager and s3 select. */
/** Async client is used for transfer manager. */
private S3AsyncClient s3AsyncClient;

// initial callback policy is fail-once; it's there just to assist
Expand Down Expand Up @@ -1725,8 +1721,7 @@ public FSDataInputStream open(Path f, int bufferSize)
/**
* Opens an FSDataInputStream at the indicated Path.
* The {@code fileInformation} parameter controls how the file
* is opened, whether it is normal vs. an S3 select call,
* can a HEAD be skipped, etc.
* is opened, can a HEAD be skipped, etc.
* @param path the file to open
* @param fileInformation information about the file to open
* @throws IOException IO failure.
Expand Down Expand Up @@ -1853,13 +1848,6 @@ public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
private final class WriteOperationHelperCallbacksImpl
implements WriteOperationHelper.WriteOperationHelperCallbacks {

@Override
public CompletableFuture<Void> selectObjectContent(
SelectObjectContentRequest request,
SelectObjectContentResponseHandler responseHandler) {
return getS3AsyncClient().selectObjectContent(request, responseHandler);
}

@Override
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
Expand All @@ -1872,7 +1860,7 @@ public CompleteMultipartUploadResponse completeMultipartUpload(
* using FS state as well as the status.
* @param fileStatus file status.
* @param auditSpan audit span.
* @return a context for read and select operations.
* @return a context for read operations.
*/
@VisibleForTesting
protected S3AReadOpContext createReadContext(
Expand Down Expand Up @@ -5452,13 +5440,6 @@ public boolean hasPathCapability(final Path path, final String capability)
// capability depends on FS configuration
return isMagicCommitEnabled();

case SelectConstants.S3_SELECT_CAPABILITY:
// select is only supported if enabled and client side encryption is
// disabled.
return !isCSEEnabled
&& SelectBinding.isSelectEnabled(getConf())
&& !s3ExpressStore;

case CommonPathCapabilities.FS_CHECKSUMS:
// capability depends on FS configuration
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
Expand Down Expand Up @@ -5572,85 +5553,6 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
return credentials.share();
}

/**
* This is a proof of concept of a select API.
* @param source path to source data
* @param options request configuration from the builder.
* @param fileInformation any passed in information.
* @return the stream of the results
* @throws IOException IO failure
*/
@Retries.RetryTranslated
@AuditEntryPoint
private FSDataInputStream select(final Path source,
final Configuration options,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
requireSelectSupport(source);
final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
final Path path = makeQualified(source);
String expression = fileInformation.getSql();
final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
fileInformation);

// readahead range can be dynamically set
S3ObjectAttributes objectAttributes = createObjectAttributes(
path, fileStatus);
ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
S3AReadOpContext readContext = createReadContext(
fileStatus,
auditSpan);
fileInformation.applyOptions(readContext);

if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
&& fileStatus.getEtag() != null) {
// if there is change detection, and the status includes at least an
// etag,
// check that the object metadata lines up with what is expected
// based on the object attributes (which may contain an eTag or
// versionId).
// This is because the select API doesn't offer this.
// (note: this is trouble for version checking as cannot force the old
// version in the final read; nor can we check the etag match)
ChangeTracker changeTracker =
new ChangeTracker(uri.toString(),
changePolicy,
readContext.getS3AStatisticsContext()
.newInputStreamStatistics()
.getChangeTrackerStatistics(),
objectAttributes);

// will retry internally if wrong version detected
Invoker readInvoker = readContext.getReadInvoker();
getObjectMetadata(path, changeTracker, readInvoker, "select");
}
// instantiate S3 Select support using the current span
// as the active span for operations.
SelectBinding selectBinding = new SelectBinding(
createWriteOperationHelper(auditSpan));

// build and execute the request
return selectBinding.select(
readContext,
expression,
options,
objectAttributes);
}

/**
* Verify the FS supports S3 Select.
* @param source source file.
* @throws UnsupportedOperationException if not.
*/
private void requireSelectSupport(final Path source) throws
UnsupportedOperationException {
if (!isCSEEnabled && !SelectBinding.isSelectEnabled(getConf())) {

throw new UnsupportedOperationException(
SelectConstants.SELECT_UNSUPPORTED);
}
}

/**
* Get the file status of the source file.
* If in the fileInformation parameter return that
Expand Down Expand Up @@ -5681,16 +5583,14 @@ private S3AFileStatus extractOrFetchSimpleFileStatus(
}

/**
* Initiate the open() or select() operation.
* Initiate the open() operation.
* This is invoked from both the FileSystem and FileContext APIs.
* It's declared as an audit entry point but the span creation is pushed
* down into the open/select methods it ultimately calls.
* down into the open operation s it ultimately calls.
* @param rawPath path to the file
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened/selected file.
* @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
* @throws PathIOException operation is a select request but S3 select is
* disabled
* @throws IllegalArgumentException unknown mandatory key
*/
@Override
Expand All @@ -5706,20 +5606,9 @@ public CompletableFuture<FSDataInputStream> openFileWithOptions(
parameters,
getDefaultBlockSize());
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
if (!fileInformation.isS3Select()) {
// normal path.
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result,
() -> executeOpen(path, fileInformation)));
} else {
// it is a select statement.
// fail fast if the operation is not available
requireSelectSupport(path);
// submit the query
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result,
() -> select(path, parameters.getOptions(), fileInformation)));
}
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result,
() -> executeOpen(path, fileInformation)));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* This class holds attributes of an object independent of the
* file status type.
* It is used in {@link S3AInputStream} and the select equivalent.
* It is used in {@link S3AInputStream} and elsewhere.
* as a way to reduce parameters being passed
* to the constructor of such class,
* and elsewhere to be a source-neutral representation of a file status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,6 @@ public enum Statistic {
StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
"number of bytes queued for upload/being actively uploaded",
TYPE_GAUGE),
OBJECT_SELECT_REQUESTS(
StoreStatisticNames.OBJECT_SELECT_REQUESTS,
"Count of S3 Select requests issued",
TYPE_COUNTER),
STREAM_READ_ABORTED(
StreamStatisticNames.STREAM_READ_ABORTED,
"Count of times the TCP stream was aborted",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import software.amazon.awssdk.core.sync.RequestBody;
Expand All @@ -33,8 +32,6 @@
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import org.slf4j.Logger;
Expand All @@ -49,16 +46,11 @@
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.select.SelectEventStreamPublisher;
import org.apache.hadoop.fs.s3a.select.SelectObjectContentHelper;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Invoker.*;
Expand All @@ -82,7 +74,6 @@
* <li>Other low-level access to S3 functions, for private use.</li>
* <li>Failure handling, including converting exceptions to IOEs.</li>
* <li>Integration with instrumentation.</li>
* <li>Evolution to add more low-level operations, such as S3 select.</li>
* </ul>
*
* This API is for internal use only.
Expand Down Expand Up @@ -615,63 +606,6 @@ public Configuration getConf() {
return conf;
}

public SelectObjectContentRequest.Builder newSelectRequestBuilder(Path path) {
try (AuditSpan span = getAuditSpan()) {
return getRequestFactory().newSelectRequestBuilder(
storeContext.pathToKey(path));
}
}

/**
* Execute an S3 Select operation.
* On a failure, the request is only logged at debug to avoid the
* select exception being printed.
*
* @param source source for selection
* @param request Select request to issue.
* @param action the action for use in exception creation
* @return response
* @throws IOException failure
*/
@Retries.RetryTranslated
public SelectEventStreamPublisher select(
final Path source,
final SelectObjectContentRequest request,
final String action)
throws IOException {
// no setting of span here as the select binding is (statically) created
// without any span.
String bucketName = request.bucket();
Preconditions.checkArgument(bucket.equals(bucketName),
"wrong bucket: %s", bucketName);
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating select call {} {}",
source, request.expression());
LOG.debug(SelectBinding.toString(request));
}
return invoker.retry(
action,
source.toString(),
true,
withinAuditSpan(getAuditSpan(), () -> {
try (DurationInfo ignored =
new DurationInfo(LOG, "S3 Select operation")) {
try {
return SelectObjectContentHelper.select(
writeOperationHelperCallbacks, source, request, action);
} catch (Throwable e) {
LOG.error("Failure of S3 Select request against {}",
source);
LOG.debug("S3 Select request against {}:\n{}",
source,
SelectBinding.toString(request),
e);
throw e;
}
}
}));
}

@Override
public AuditSpan createSpan(final String operation,
@Nullable final String path1,
Expand Down Expand Up @@ -705,15 +639,6 @@ public RequestFactory getRequestFactory() {
*/
public interface WriteOperationHelperCallbacks {

/**
* Initiates a select request.
* @param request selectObjectContent request
* @param t selectObjectContent request handler
* @return selectObjectContentResult
*/
CompletableFuture<Void> selectObjectContent(SelectObjectContentRequest request,
SelectObjectContentResponseHandler t);

/**
* Initiates a complete multi-part upload request.
* @param request Complete multi-part upload request
Expand Down
Loading