Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,15 @@

import com.amazonaws.services.s3.model.CSVInput;
import com.amazonaws.services.s3.model.CSVOutput;
import com.amazonaws.services.s3.model.CompressionType;
import com.amazonaws.services.s3.model.ExpressionType;
import com.amazonaws.services.s3.model.InputSerialization;
import com.amazonaws.services.s3.model.OutputSerialization;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.s3.PrestoS3ClientFactory;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;

import java.net.URI;
import java.util.Properties;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR;
import static org.apache.hadoop.hive.serde.serdeConstants.QUOTE_CHAR;

Expand Down Expand Up @@ -64,15 +54,9 @@ public class S3SelectCsvRecordReader
}

@Override
public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, String query, Path path)
public InputSerialization buildInputSerialization()
{
SelectObjectContentRequest selectObjectRequest = new SelectObjectContentRequest();
URI uri = path.toUri();
selectObjectRequest.setBucketName(PrestoS3FileSystem.getBucketName(uri));
selectObjectRequest.setKey(PrestoS3FileSystem.keyFromPath(path));
selectObjectRequest.setExpression(query);
selectObjectRequest.setExpressionType(ExpressionType.SQL);

Properties schema = getSchema();
String fieldDelimiter = getFieldDelimiter(schema);
String quoteChar = schema.getProperty(QUOTE_CHAR, null);
String escapeChar = schema.getProperty(ESCAPE_CHAR, null);
Expand All @@ -84,20 +68,19 @@ public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, St
selectObjectCSVInputSerialization.setQuoteCharacter(quoteChar);
selectObjectCSVInputSerialization.setQuoteEscapeCharacter(escapeChar);
InputSerialization selectObjectInputSerialization = new InputSerialization();
selectObjectInputSerialization.setCompressionType(getCompressionType());
selectObjectInputSerialization.setCsv(selectObjectCSVInputSerialization);

CompressionCodec codec = compressionCodecFactory.getCodec(path);
if (codec instanceof GzipCodec) {
selectObjectInputSerialization.setCompressionType(CompressionType.GZIP);
}
else if (codec instanceof BZip2Codec) {
selectObjectInputSerialization.setCompressionType(CompressionType.BZIP2);
}
else if (codec != null) {
throw new PrestoException(NOT_SUPPORTED, "Compression extension not supported for S3 Select: " + path);
}
return selectObjectInputSerialization;
}

selectObjectInputSerialization.setCsv(selectObjectCSVInputSerialization);
selectObjectRequest.setInputSerialization(selectObjectInputSerialization);
@Override
public OutputSerialization buildOutputSerialization()
{
Properties schema = getSchema();
String fieldDelimiter = getFieldDelimiter(schema);
String quoteChar = schema.getProperty(QUOTE_CHAR, null);
String escapeChar = schema.getProperty(ESCAPE_CHAR, null);

OutputSerialization selectObjectOutputSerialization = new OutputSerialization();
CSVOutput selectObjectCSVOutputSerialization = new CSVOutput();
Expand All @@ -106,8 +89,7 @@ else if (codec != null) {
selectObjectCSVOutputSerialization.setQuoteCharacter(quoteChar);
selectObjectCSVOutputSerialization.setQuoteEscapeCharacter(escapeChar);
selectObjectOutputSerialization.setCsv(selectObjectCSVOutputSerialization);
selectObjectRequest.setOutputSerialization(selectObjectOutputSerialization);

return selectObjectRequest;
return selectObjectOutputSerialization;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.s3select;

public enum S3SelectDataType
{
CSV
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,44 @@
package com.facebook.presto.hive.s3select;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompressionType;
import com.amazonaws.services.s3.model.InputSerialization;
import com.amazonaws.services.s3.model.OutputSerialization;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ClientFactory;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
import com.facebook.presto.hive.s3.PrestoS3SelectClient;
import com.facebook.presto.spi.PrestoException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closer;
import io.airlift.units.Duration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

import static com.amazonaws.services.s3.model.ExpressionType.SQL;
import static com.facebook.presto.hive.RetryDriver.retry;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_BACKOFF_TIME;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_CLIENT_RETRIES;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_RETRY_TIME;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.lang.String.format;
Expand Down Expand Up @@ -74,6 +85,8 @@ public abstract class S3SelectLineRecordReader
private final SelectObjectContentRequest selectObjectContentRequest;
protected final CompressionCodecFactory compressionCodecFactory;
protected final String lineDelimiter;
private final Properties schema;
private final CompressionType compressionType;

S3SelectLineRecordReader(
Configuration configuration,
Expand All @@ -100,7 +113,9 @@ public abstract class S3SelectLineRecordReader
this.isFirstLine = true;

this.compressionCodecFactory = new CompressionCodecFactory(configuration);
this.selectObjectContentRequest = buildSelectObjectRequest(schema, ionSqlQuery, path);
this.compressionType = getCompressionType(path);
this.schema = schema;
this.selectObjectContentRequest = buildSelectObjectRequest(ionSqlQuery, path);

HiveS3Config defaults = new HiveS3Config();
this.maxAttempts = configuration.getInt(S3_MAX_CLIENT_RETRIES, defaults.getS3MaxClientRetries()) + 1;
Expand All @@ -111,7 +126,52 @@ public abstract class S3SelectLineRecordReader
closer.register(selectClient);
}

public abstract SelectObjectContentRequest buildSelectObjectRequest(Properties schema, String query, Path path);
protected abstract InputSerialization buildInputSerialization();

protected abstract OutputSerialization buildOutputSerialization();

protected Properties getSchema()
{
return schema;
}

protected CompressionType getCompressionType()
{
return compressionType;
}

public SelectObjectContentRequest buildSelectObjectRequest(String query, Path path)
{
SelectObjectContentRequest selectObjectRequest = new SelectObjectContentRequest();
URI uri = path.toUri();
selectObjectRequest.setBucketName(PrestoS3FileSystem.getBucketName(uri));
selectObjectRequest.setKey(PrestoS3FileSystem.keyFromPath(path));
selectObjectRequest.setExpression(query);
selectObjectRequest.setExpressionType(SQL);

InputSerialization selectObjectInputSerialization = buildInputSerialization();
selectObjectRequest.setInputSerialization(selectObjectInputSerialization);

OutputSerialization selectObjectOutputSerialization = buildOutputSerialization();
selectObjectRequest.setOutputSerialization(selectObjectOutputSerialization);

return selectObjectRequest;
}

protected CompressionType getCompressionType(Path path)
{
CompressionCodec codec = compressionCodecFactory.getCodec(path);
if (codec == null) {
return CompressionType.NONE;
}
if (codec instanceof GzipCodec) {
return CompressionType.GZIP;
}
if (codec instanceof BZip2Codec) {
return CompressionType.BZIP2;
}
throw new PrestoException(NOT_SUPPORTED, "Compression extension not supported for S3 Select: " + path);
}

private int readLine(Text value)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.s3select;

import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.s3.PrestoS3ClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.util.Optional;
import java.util.Properties;

public class S3SelectLineRecordReaderProvider
{
private S3SelectLineRecordReaderProvider() {}

public static Optional<S3SelectLineRecordReader> get(
Configuration configuration,
HiveClientConfig clientConfig,
Path path,
long start,
long length,
Properties schema,
String ionSqlQuery,
PrestoS3ClientFactory s3ClientFactory,
S3SelectDataType dataType)
{
switch (dataType) {
case CSV:
return Optional.of(new S3SelectCsvRecordReader(configuration, clientConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory));
default:
// return empty if data type is not returned by the serDeMapper or unrecognizable by the LineRecordReader
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.spi.ConnectorSession;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
Expand Down Expand Up @@ -54,7 +53,6 @@ public class S3SelectPushdown
{
private static final Logger LOG = Logger.get(S3SelectPushdown.class);
private static final Set<String> SUPPORTED_S3_PREFIXES = ImmutableSet.of("s3://", "s3a://", "s3n://");
private static final Set<String> SUPPORTED_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName());
private static final Set<String> SUPPORTED_INPUT_FORMATS = ImmutableSet.of(TextInputFormat.class.getName());

/*
Expand All @@ -80,7 +78,7 @@ private S3SelectPushdown() {}
private static boolean isSerdeSupported(Properties schema)
{
String serdeName = getDeserializerClassName(schema);
return SUPPORTED_SERDES.contains(serdeName);
return S3SelectSerDeDataTypeMapper.doesSerDeExist(serdeName);
}

private static boolean isInputFormatSupported(Properties schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.joda.time.DateTimeZone;

import javax.inject.Inject;
Expand All @@ -49,7 +48,6 @@
public class S3SelectRecordCursorProvider
implements HiveRecordCursorProvider
{
private static final Set<String> CSV_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName());
private final HdfsEnvironment hdfsEnvironment;
private final HiveClientConfig clientConfig;
private final PrestoS3ClientFactory s3ClientFactory;
Expand Down Expand Up @@ -95,11 +93,17 @@ public Optional<RecordCursor> createRecordCursor(
}

String serdeName = getDeserializerClassName(schema);
if (CSV_SERDES.contains(serdeName)) {
Optional<S3SelectDataType> s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName);

if (s3SelectDataTypeOptional.isPresent()) {
S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get();

IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager);
String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate);
S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, clientConfig, path, fileSplit.getStart(), fileSplit.getLength(), schema, ionSqlQuery, s3ClientFactory);
return Optional.of(new S3SelectRecordCursor(configuration, path, recordReader, fileSplit.getLength(), schema, columns, hiveStorageTimeZone, typeManager));
Optional<S3SelectLineRecordReader> recordReader = S3SelectLineRecordReaderProvider.get(configuration, clientConfig, path, fileSplit.getStart(), fileSplit.getLength(), schema, ionSqlQuery, s3ClientFactory, s3SelectDataType);

// If S3 Select data type is not mapped to a S3SelectLineRecordReader it will return Optional.empty()
return recordReader.map(s3SelectLineRecordReader -> new S3SelectRecordCursor<>(configuration, path, s3SelectLineRecordReader, fileSplit.getLength(), schema, columns, hiveStorageTimeZone, typeManager));
}

// unsupported serdes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.s3select;

import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;

import java.util.Map;
import java.util.Optional;

public class S3SelectSerDeDataTypeMapper
{
// Contains mapping of SerDe class name to corresponding data type.
// Multiple SerDe classes can be mapped to the same data type.
private static final Map<String, S3SelectDataType> SERDE_TO_DATA_TYPE_MAPPING = ImmutableMap.of(
LazySimpleSerDe.class.getName(), S3SelectDataType.CSV);

private S3SelectSerDeDataTypeMapper() {}

public static Optional<S3SelectDataType> getDataType(String serdeName)
{
return Optional.ofNullable(SERDE_TO_DATA_TYPE_MAPPING.get(serdeName));
}

public static boolean doesSerDeExist(String serdeName)
{
return SERDE_TO_DATA_TYPE_MAPPING.containsKey(serdeName);
}
}
Loading