diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectCsvRecordReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectCsvRecordReader.java index 366027c592e04..59d2d2a8621da 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectCsvRecordReader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectCsvRecordReader.java @@ -15,25 +15,15 @@ import com.amazonaws.services.s3.model.CSVInput; import com.amazonaws.services.s3.model.CSVOutput; -import com.amazonaws.services.s3.model.CompressionType; -import com.amazonaws.services.s3.model.ExpressionType; import com.amazonaws.services.s3.model.InputSerialization; import com.amazonaws.services.s3.model.OutputSerialization; -import com.amazonaws.services.s3.model.SelectObjectContentRequest; import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.s3.PrestoS3ClientFactory; -import com.facebook.presto.hive.s3.PrestoS3FileSystem; -import com.facebook.presto.spi.PrestoException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import java.net.URI; import java.util.Properties; -import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR; import static org.apache.hadoop.hive.serde.serdeConstants.QUOTE_CHAR; @@ -64,15 +54,9 @@ public class S3SelectCsvRecordReader } @Override - public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, String query, Path path) + public InputSerialization buildInputSerialization() { - SelectObjectContentRequest selectObjectRequest = new SelectObjectContentRequest(); - URI uri = path.toUri(); - selectObjectRequest.setBucketName(PrestoS3FileSystem.getBucketName(uri)); - selectObjectRequest.setKey(PrestoS3FileSystem.keyFromPath(path)); - selectObjectRequest.setExpression(query); - selectObjectRequest.setExpressionType(ExpressionType.SQL); - + Properties schema = getSchema(); String fieldDelimiter = getFieldDelimiter(schema); String quoteChar = schema.getProperty(QUOTE_CHAR, null); String escapeChar = schema.getProperty(ESCAPE_CHAR, null); @@ -84,20 +68,19 @@ public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, St selectObjectCSVInputSerialization.setQuoteCharacter(quoteChar); selectObjectCSVInputSerialization.setQuoteEscapeCharacter(escapeChar); InputSerialization selectObjectInputSerialization = new InputSerialization(); + selectObjectInputSerialization.setCompressionType(getCompressionType()); + selectObjectInputSerialization.setCsv(selectObjectCSVInputSerialization); - CompressionCodec codec = compressionCodecFactory.getCodec(path); - if (codec instanceof GzipCodec) { - selectObjectInputSerialization.setCompressionType(CompressionType.GZIP); - } - else if (codec instanceof BZip2Codec) { - selectObjectInputSerialization.setCompressionType(CompressionType.BZIP2); - } - else if (codec != null) { - throw new PrestoException(NOT_SUPPORTED, "Compression extension not supported for S3 Select: " + path); - } + return selectObjectInputSerialization; + } - selectObjectInputSerialization.setCsv(selectObjectCSVInputSerialization); - selectObjectRequest.setInputSerialization(selectObjectInputSerialization); + @Override + public OutputSerialization buildOutputSerialization() + { + Properties schema = getSchema(); + String fieldDelimiter = getFieldDelimiter(schema); + String quoteChar = schema.getProperty(QUOTE_CHAR, null); + String escapeChar = schema.getProperty(ESCAPE_CHAR, null); OutputSerialization selectObjectOutputSerialization = new OutputSerialization(); CSVOutput selectObjectCSVOutputSerialization = new CSVOutput(); @@ -106,8 +89,7 @@ else if (codec != null) { selectObjectCSVOutputSerialization.setQuoteCharacter(quoteChar); selectObjectCSVOutputSerialization.setQuoteEscapeCharacter(escapeChar); selectObjectOutputSerialization.setCsv(selectObjectCSVOutputSerialization); - selectObjectRequest.setOutputSerialization(selectObjectOutputSerialization); - return selectObjectRequest; + return selectObjectOutputSerialization; } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectDataType.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectDataType.java new file mode 100644 index 0000000000000..890dce4ff1dc6 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectDataType.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.s3select; + +public enum S3SelectDataType +{ + CSV +} diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReader.java index 1c193f3b9abc7..d6902f0fa9113 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReader.java @@ -14,11 +14,16 @@ package com.facebook.presto.hive.s3select; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CompressionType; +import com.amazonaws.services.s3.model.InputSerialization; +import com.amazonaws.services.s3.model.OutputSerialization; import com.amazonaws.services.s3.model.SelectObjectContentRequest; import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.hive.s3.PrestoS3ClientFactory; +import com.facebook.presto.hive.s3.PrestoS3FileSystem; import com.facebook.presto.hive.s3.PrestoS3SelectClient; +import com.facebook.presto.spi.PrestoException; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Closer; import io.airlift.units.Duration; @@ -26,7 +31,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.LineReader; @@ -34,13 +42,16 @@ import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Properties; +import static com.amazonaws.services.s3.model.ExpressionType.SQL; import static com.facebook.presto.hive.RetryDriver.retry; import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_BACKOFF_TIME; import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_CLIENT_RETRIES; import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_RETRY_TIME; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.base.Throwables.throwIfUnchecked; import static java.lang.String.format; @@ -74,6 +85,8 @@ public abstract class S3SelectLineRecordReader private final SelectObjectContentRequest selectObjectContentRequest; protected final CompressionCodecFactory compressionCodecFactory; protected final String lineDelimiter; + private final Properties schema; + private final CompressionType compressionType; S3SelectLineRecordReader( Configuration configuration, @@ -100,7 +113,9 @@ public abstract class S3SelectLineRecordReader this.isFirstLine = true; this.compressionCodecFactory = new CompressionCodecFactory(configuration); - this.selectObjectContentRequest = buildSelectObjectRequest(schema, ionSqlQuery, path); + this.compressionType = getCompressionType(path); + this.schema = schema; + this.selectObjectContentRequest = buildSelectObjectRequest(ionSqlQuery, path); HiveS3Config defaults = new HiveS3Config(); this.maxAttempts = configuration.getInt(S3_MAX_CLIENT_RETRIES, defaults.getS3MaxClientRetries()) + 1; @@ -111,7 +126,52 @@ public abstract class S3SelectLineRecordReader closer.register(selectClient); } - public abstract SelectObjectContentRequest buildSelectObjectRequest(Properties schema, String query, Path path); + protected abstract InputSerialization buildInputSerialization(); + + protected abstract OutputSerialization buildOutputSerialization(); + + protected Properties getSchema() + { + return schema; + } + + protected CompressionType getCompressionType() + { + return compressionType; + } + + public SelectObjectContentRequest buildSelectObjectRequest(String query, Path path) + { + SelectObjectContentRequest selectObjectRequest = new SelectObjectContentRequest(); + URI uri = path.toUri(); + selectObjectRequest.setBucketName(PrestoS3FileSystem.getBucketName(uri)); + selectObjectRequest.setKey(PrestoS3FileSystem.keyFromPath(path)); + selectObjectRequest.setExpression(query); + selectObjectRequest.setExpressionType(SQL); + + InputSerialization selectObjectInputSerialization = buildInputSerialization(); + selectObjectRequest.setInputSerialization(selectObjectInputSerialization); + + OutputSerialization selectObjectOutputSerialization = buildOutputSerialization(); + selectObjectRequest.setOutputSerialization(selectObjectOutputSerialization); + + return selectObjectRequest; + } + + protected CompressionType getCompressionType(Path path) + { + CompressionCodec codec = compressionCodecFactory.getCodec(path); + if (codec == null) { + return CompressionType.NONE; + } + if (codec instanceof GzipCodec) { + return CompressionType.GZIP; + } + if (codec instanceof BZip2Codec) { + return CompressionType.BZIP2; + } + throw new PrestoException(NOT_SUPPORTED, "Compression extension not supported for S3 Select: " + path); + } private int readLine(Text value) throws IOException diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReaderProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReaderProvider.java new file mode 100644 index 0000000000000..6fe6dea3e0465 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectLineRecordReaderProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.s3select; + +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.s3.PrestoS3ClientFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.util.Optional; +import java.util.Properties; + +public class S3SelectLineRecordReaderProvider +{ + private S3SelectLineRecordReaderProvider() {} + + public static Optional get( + Configuration configuration, + HiveClientConfig clientConfig, + Path path, + long start, + long length, + Properties schema, + String ionSqlQuery, + PrestoS3ClientFactory s3ClientFactory, + S3SelectDataType dataType) + { + switch (dataType) { + case CSV: + return Optional.of(new S3SelectCsvRecordReader(configuration, clientConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory)); + default: + // return empty if data type is not returned by the serDeMapper or unrecognizable by the LineRecordReader + return Optional.empty(); + } + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectPushdown.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectPushdown.java index 85956817e19d9..a0adf59b7aeaa 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectPushdown.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectPushdown.java @@ -20,7 +20,6 @@ import com.facebook.presto.spi.ConnectorSession; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; @@ -54,7 +53,6 @@ public class S3SelectPushdown { private static final Logger LOG = Logger.get(S3SelectPushdown.class); private static final Set SUPPORTED_S3_PREFIXES = ImmutableSet.of("s3://", "s3a://", "s3n://"); - private static final Set SUPPORTED_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName()); private static final Set SUPPORTED_INPUT_FORMATS = ImmutableSet.of(TextInputFormat.class.getName()); /* @@ -80,7 +78,7 @@ private S3SelectPushdown() {} private static boolean isSerdeSupported(Properties schema) { String serdeName = getDeserializerClassName(schema); - return SUPPORTED_SERDES.contains(serdeName); + return S3SelectSerDeDataTypeMapper.doesSerDeExist(serdeName); } private static boolean isInputFormatSupported(Properties schema) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectRecordCursorProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectRecordCursorProvider.java index 7f87f10351350..30b357fce6e9e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectRecordCursorProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectRecordCursorProvider.java @@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.joda.time.DateTimeZone; import javax.inject.Inject; @@ -49,7 +48,6 @@ public class S3SelectRecordCursorProvider implements HiveRecordCursorProvider { - private static final Set CSV_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName()); private final HdfsEnvironment hdfsEnvironment; private final HiveClientConfig clientConfig; private final PrestoS3ClientFactory s3ClientFactory; @@ -95,11 +93,17 @@ public Optional createRecordCursor( } String serdeName = getDeserializerClassName(schema); - if (CSV_SERDES.contains(serdeName)) { + Optional s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName); + + if (s3SelectDataTypeOptional.isPresent()) { + S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get(); + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager); String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate); - S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, clientConfig, path, fileSplit.getStart(), fileSplit.getLength(), schema, ionSqlQuery, s3ClientFactory); - return Optional.of(new S3SelectRecordCursor(configuration, path, recordReader, fileSplit.getLength(), schema, columns, hiveStorageTimeZone, typeManager)); + Optional recordReader = S3SelectLineRecordReaderProvider.get(configuration, clientConfig, path, fileSplit.getStart(), fileSplit.getLength(), schema, ionSqlQuery, s3ClientFactory, s3SelectDataType); + + // If S3 Select data type is not mapped to a S3SelectLineRecordReader it will return Optional.empty() + return recordReader.map(s3SelectLineRecordReader -> new S3SelectRecordCursor<>(configuration, path, s3SelectLineRecordReader, fileSplit.getLength(), schema, columns, hiveStorageTimeZone, typeManager)); } // unsupported serdes diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectSerDeDataTypeMapper.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectSerDeDataTypeMapper.java new file mode 100644 index 0000000000000..a22576ace83fd --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3select/S3SelectSerDeDataTypeMapper.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.s3select; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; + +import java.util.Map; +import java.util.Optional; + +public class S3SelectSerDeDataTypeMapper +{ + // Contains mapping of SerDe class name to corresponding data type. + // Multiple SerDe classes can be mapped to the same data type. + private static final Map SERDE_TO_DATA_TYPE_MAPPING = ImmutableMap.of( + LazySimpleSerDe.class.getName(), S3SelectDataType.CSV); + + private S3SelectSerDeDataTypeMapper() {} + + public static Optional getDataType(String serdeName) + { + return Optional.ofNullable(SERDE_TO_DATA_TYPE_MAPPING.get(serdeName)); + } + + public static boolean doesSerDeExist(String serdeName) + { + return SERDE_TO_DATA_TYPE_MAPPING.containsKey(serdeName); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index 9a779b387b570..16c4cb2bc1918 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -78,6 +78,7 @@ import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.BiFunction; @@ -103,6 +104,7 @@ import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveSelectivePageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultOrcFileWriterFactory; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultS3HiveRecordCursorProvider; import static com.facebook.presto.hive.HiveTestUtils.getTypes; import static com.facebook.presto.hive.metastore.MetastoreOperationResult.EMPTY_RESULT; import static com.facebook.presto.hive.metastore.NoopMetastoreCacheStats.NOOP_METASTORE_CACHE_STATS; @@ -249,7 +251,17 @@ protected void setup(String host, int port, String databaseName, BiFunction recordCursorProviderSet = s3SelectPushdownEnabled ? + getDefaultS3HiveRecordCursorProvider(config, metastoreClientConfig) : + getDefaultHiveRecordCursorProvider(config, metastoreClientConfig); + pageSourceProvider = new HivePageSourceProvider( + config, + hdfsEnvironment, + recordCursorProviderSet, + getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), + getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), + FUNCTION_AND_TYPE_MANAGER, + ROW_EXPRESSION_SERVICE); } protected ConnectorSession newSession() diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java index 276709ebdd766..3bff8f92fdd25 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java @@ -14,7 +14,6 @@ package com.facebook.presto.hive; import com.facebook.presto.cache.CacheConfig; -import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.Type; import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction; import com.facebook.presto.hive.AbstractTestHiveClient.Transaction; @@ -28,7 +27,6 @@ import com.facebook.presto.spi.ConnectorTableLayoutResult; import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.SchemaTableName; -import com.facebook.presto.spi.SplitContext; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; @@ -47,6 +45,7 @@ import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits; import static com.facebook.presto.hive.AbstractTestHiveFileSystem.SPLIT_SCHEDULING_CONTEXT; import static com.facebook.presto.hive.HiveTestUtils.getTypes; +import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE; import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -96,7 +95,7 @@ public static MaterializedResult readTable(SchemaTableName tableName, split, tableHandle.getLayout().get(), columnHandles, - new SplitContext(false, TupleDomain.none()))) { + NON_CACHEABLE)) { MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { Object[] dataValues = IntStream.range(0, row.getFieldCount()) @@ -154,7 +153,7 @@ public static MaterializedResult filterTable(SchemaTableName tableName, split, tableHandle.getLayout().get(), projectedColumns, - new SplitContext(false, TupleDomain.none()))) { + NON_CACHEABLE)) { MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { Object[] dataValues = IntStream.range(0, row.getFieldCount()) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java index a0d562b10cd70..6e8b3a55f55ed 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java @@ -43,7 +43,9 @@ import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory; import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ClientFactory; import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.hive.s3select.S3SelectRecordCursorProvider; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.operator.PagesIndex; @@ -175,6 +177,18 @@ public static Set getDefaultHiveRecordCursorProvider(H .build(); } + public static Set getDefaultS3HiveRecordCursorProvider(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig) + { + HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig); + // Without S3SelectRecordCursorProvider we are not pushing down to Select, but falling on GET path. + // GenericHiveRecordCursorProvider is needed to handle Hive splits when the query does not filter data + // as this is no longer pushed to Select. + return ImmutableSet.builder() + .add(new S3SelectRecordCursorProvider(testHdfsEnvironment, hiveClientConfig, new PrestoS3ClientFactory())) + .add(new GenericHiveRecordCursorProvider(testHdfsEnvironment)) + .build(); + } + public static Set getDefaultHiveFileWriterFactories(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig) { HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);