diff --git a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh index 2caa78bd3678..9f952137d8af 100755 --- a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh +++ b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh @@ -10,6 +10,9 @@ check_vars S3_BUCKET S3_BUCKET_ENDPOINT \ AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY cleanup_hadoop_docker_containers + +# Use Hadoop version 3.1 for S3 tests as the JSON SerDe class is not available in lower versions. +export HADOOP_BASE_IMAGE="ghcr.io/trinodb/testing/hdp3.1-hive" start_hadoop_docker_containers test_directory="$(date '+%Y%m%d-%H%M%S')-$(uuidgen | sha1sum | cut -b 1-6)" @@ -66,6 +69,14 @@ exec_in_hadoop_master_container /usr/bin/hive -e " STORED AS TEXTFILE LOCATION '${table_path}'" +table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_json/" +exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}" +exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table.json{,.gz,.bz2} "${table_path}" +exec_in_hadoop_master_container /usr/bin/hive -e " + CREATE EXTERNAL TABLE trino_s3select_test_external_fs_json(col_1 bigint, col_2 bigint) + ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' + LOCATION '${table_path}'" + stop_unnecessary_hadoop_services # restart hive-metastore to apply S3 changes in core-site.xml diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table.json b/plugin/trino-hive-hadoop2/conf/files/test_table.json new file mode 100644 index 000000000000..6173ff8aae87 --- /dev/null +++ b/plugin/trino-hive-hadoop2/conf/files/test_table.json @@ -0,0 +1,2 @@ +{"col_1":2, "col_2":4} +{"col_1":5, "col_2":6} diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table.json.bz2 b/plugin/trino-hive-hadoop2/conf/files/test_table.json.bz2 new file mode 100644 index 000000000000..6b90f2081e35 Binary files /dev/null and b/plugin/trino-hive-hadoop2/conf/files/test_table.json.bz2 differ diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table.json.gz b/plugin/trino-hive-hadoop2/conf/files/test_table.json.gz new file mode 100644 index 000000000000..ae4635797603 Binary files /dev/null and b/plugin/trino-hive-hadoop2/conf/files/test_table.json.gz differ diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java index f3e5ae8f3673..a23a19225dbe 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java @@ -28,6 +28,7 @@ public class TestHiveFileSystemS3SelectPushdown { protected SchemaTableName tableWithPipeDelimiter; protected SchemaTableName tableWithCommaDelimiter; + protected SchemaTableName tableJson; @Parameters({ "hive.hadoop2.metastoreHost", @@ -45,6 +46,7 @@ public void setup(String host, int port, String databaseName, String awsAccessKe tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter"); tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter"); + tableJson = new SchemaTableName(database, "trino_s3select_test_external_fs_json"); } @Test @@ -72,4 +74,17 @@ public void testGetRecordsWithCommaDelimiter() .row(11L, 24L).row(1L, 6L).row(21L, 12L).row(0L, 0L) // test_table_with_comma_delimiter.csv.bz2 .build()); } + + @Test + public void testGetRecordsJson() + throws Exception + { + assertEqualsIgnoreOrder( + readTable(tableJson), + MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT) + .row(2L, 4L).row(5L, 6L) // test_table.json + .row(7L, 23L).row(28L, 22L).row(13L, 10L) // test_table.json.gz + .row(1L, 19L).row(6L, 3L).row(24L, 22L).row(100L, 77L) // test_table.json.bz2 + .build()); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java index dd5700dc3655..68025f1e5b2a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java @@ -19,6 +19,7 @@ import com.google.common.primitives.SignedBytes; import io.airlift.slice.Slice; import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.spi.TrinoException; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; @@ -36,6 +37,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -59,10 +61,12 @@ public class IonSqlQueryBuilder private static final DateTimeFormatter FORMATTER = date().withChronology(getInstanceUTC()); private static final String DATA_SOURCE = "S3Object s"; private final TypeManager typeManager; + private final S3SelectDataType s3SelectDataType; - public IonSqlQueryBuilder(TypeManager typeManager) + public IonSqlQueryBuilder(TypeManager typeManager, S3SelectDataType s3SelectDataType) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.s3SelectDataType = requireNonNull(s3SelectDataType, "s3SelectDataType is null"); } public String buildSql(List columns, TupleDomain tupleDomain) @@ -72,6 +76,7 @@ public String buildSql(List columns, TupleDomain checkArgument(column.isBaseColumn(), "%s is not a base column", column)); }); + // SELECT clause StringBuilder sql = new StringBuilder("SELECT "); if (columns.isEmpty()) { @@ -79,14 +84,16 @@ public String buildSql(List columns, TupleDomain format("s._%d", column.getBaseHiveColumnIndex() + 1)) + .map(this::getFullyQualifiedColumnName) .collect(joining(", ")); sql.append(columnNames); } + // FROM clause sql.append(" FROM "); sql.append(DATA_SOURCE); + // WHERE clause List clauses = toConjuncts(columns, tupleDomain); if (!clauses.isEmpty()) { sql.append(" WHERE ") @@ -96,6 +103,18 @@ public String buildSql(List columns, TupleDomain toConjuncts(List columns, TupleDomain tupleDomain) { ImmutableList.Builder builder = ImmutableList.builder(); @@ -104,7 +123,7 @@ private List toConjuncts(List columns, TupleDomain '' "; + return getFullyQualifiedColumnName(column) + " <> '' "; } List disjuncts = new ArrayList<>(); @@ -152,10 +171,10 @@ private String toPredicate(Domain domain, Type type, int position) } List rangeConjuncts = new ArrayList<>(); if (!range.isLowUnbounded()) { - rangeConjuncts.add(toPredicate(range.isLowInclusive() ? ">=" : ">", range.getLowBoundedValue(), type, position)); + rangeConjuncts.add(toPredicate(range.isLowInclusive() ? ">=" : ">", range.getLowBoundedValue(), type, column)); } if (!range.isHighUnbounded()) { - rangeConjuncts.add(toPredicate(range.isHighInclusive() ? "<=" : "<", range.getHighBoundedValue(), type, position)); + rangeConjuncts.add(toPredicate(range.isHighInclusive() ? "<=" : "<", range.getHighBoundedValue(), type, column)); } // If rangeConjuncts is null, then the range was ALL, which should already have been checked for checkState(!rangeConjuncts.isEmpty()); @@ -164,7 +183,7 @@ private String toPredicate(Domain domain, Type type, int position) // Add back all of the possible single values either as an equality or an IN predicate if (singleValues.size() == 1) { - disjuncts.add(toPredicate("=", getOnlyElement(singleValues), type, position)); + disjuncts.add(toPredicate("=", getOnlyElement(singleValues), type, column)); } else if (singleValues.size() > 1) { List values = new ArrayList<>(); @@ -172,23 +191,23 @@ else if (singleValues.size() > 1) { checkType(type); values.add(valueToQuery(type, value)); } - disjuncts.add(createColumn(type, position) + " IN (" + Joiner.on(",").join(values) + ")"); + disjuncts.add(createColumn(type, column) + " IN (" + Joiner.on(",").join(values) + ")"); } // Add nullability disjuncts checkState(!disjuncts.isEmpty()); if (domain.isNullAllowed()) { - disjuncts.add(format("s._%d", position + 1) + " = '' "); + disjuncts.add(getFullyQualifiedColumnName(column) + " = '' "); } return "(" + Joiner.on(" OR ").join(disjuncts) + ")"; } - private String toPredicate(String operator, Object value, Type type, int position) + private String toPredicate(String operator, Object value, Type type, HiveColumnHandle column) { checkType(type); - return format("%s %s %s", createColumn(type, position), operator, valueToQuery(type, value)); + return format("%s %s %s", createColumn(type, column), operator, valueToQuery(type, value)); } private static void checkType(Type type) @@ -228,9 +247,9 @@ private static String valueToQuery(Type type, Object value) return "'" + ((Slice) value).toStringUtf8() + "'"; } - private String createColumn(Type type, int position) + private String createColumn(Type type, HiveColumnHandle columnHandle) { - String column = format("s._%d", position + 1); + String column = getFullyQualifiedColumnName(columnHandle); if (type.equals(BIGINT) || type.equals(INTEGER) || type.equals(SMALLINT) || type.equals(TINYINT)) { return formatPredicate(column, "INT"); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectDataType.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectDataType.java new file mode 100644 index 000000000000..70872574d5db --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectDataType.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +public enum S3SelectDataType { + CSV, + JSON +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java index 84fa6199e9cc..2b906424ea33 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java @@ -76,7 +76,7 @@ public abstract class S3SelectLineRecordReader protected final CompressionCodecFactory compressionCodecFactory; protected final String lineDelimiter; - S3SelectLineRecordReader( + public S3SelectLineRecordReader( Configuration configuration, Path path, long start, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java new file mode 100644 index 000000000000..49221c398280 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader; +import io.trino.plugin.hive.s3select.json.S3SelectJsonRecordReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.util.Optional; +import java.util.Properties; + +/** + * Returns an S3SelectLineRecordReader based on the serDe class. It supports CSV and JSON formats, and + * will not push down any other formats. + */ +public class S3SelectLineRecordReaderProvider +{ + private S3SelectLineRecordReaderProvider() {} + + public static Optional get(Configuration configuration, + Path path, + long start, + long length, + Properties schema, + String ionSqlQuery, + TrinoS3ClientFactory s3ClientFactory, + S3SelectDataType dataType) + { + switch (dataType) { + case CSV: + return Optional.of(new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory)); + case JSON: + return Optional.of(new S3SelectJsonRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory)); + default: + // return empty if data type is not returned by the serDeMapper or unrecognizable by the LineRecordReader + return Optional.empty(); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java index 27ec1e12abe9..87d0ab2d2734 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java @@ -19,7 +19,6 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.spi.connector.ConnectorSession; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; @@ -56,7 +55,6 @@ public final class S3SelectPushdown { private static final Set SUPPORTED_S3_PREFIXES = ImmutableSet.of("s3://", "s3a://", "s3n://"); - private static final Set SUPPORTED_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName()); /* * Double and Real Types lose precision. Thus, they are not pushed down to S3. Please use Decimal Type if push down is desired. @@ -77,10 +75,10 @@ public final class S3SelectPushdown private S3SelectPushdown() {} - private static boolean isSerdeSupported(Properties schema) + private static boolean isSerDeSupported(Properties schema) { String serdeName = getDeserializerClassName(schema); - return SUPPORTED_SERDES.contains(serdeName); + return S3SelectSerDeDataTypeMapper.doesSerDeExist(serdeName); } private static boolean isInputFormatSupported(Properties schema) @@ -163,7 +161,7 @@ public static boolean shouldEnablePushdownForTable(ConnectorSession session, Tab private static boolean shouldEnablePushdownForTable(Table table, String path, Properties schema) { return isS3Storage(path) && - isSerdeSupported(schema) && + isSerDeSupported(schema) && isInputFormatSupported(schema) && areColumnTypesSupported(table.getDataColumns()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java index df44b8e5d643..8b06009ecd93 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hive.s3select; -import com.google.common.collect.ImmutableSet; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveRecordCursorProvider; @@ -25,7 +24,6 @@ import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import javax.inject.Inject; @@ -33,7 +31,6 @@ import java.util.List; import java.util.Optional; import java.util.Properties; -import java.util.Set; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; @@ -44,7 +41,6 @@ public class S3SelectRecordCursorProvider implements HiveRecordCursorProvider { - private static final Set CSV_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName()); private final HdfsEnvironment hdfsEnvironment; private final TrinoS3ClientFactory s3ClientFactory; @@ -85,21 +81,32 @@ public Optional createRecordCursor( effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn()); String serdeName = getDeserializerClassName(schema); - if (CSV_SERDES.contains(serdeName)) { + Optional s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName); + + if (s3SelectDataTypeOptional.isPresent()) { + S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get(); + List readerColumns = projectedReaderColumns .map(ReaderColumns::get) .map(readColumns -> readColumns.stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList())) .orElse(columns); - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager); + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType); String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate); - S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory); + Optional recordReader = S3SelectLineRecordReaderProvider.get(configuration, path, start, length, schema, + ionSqlQuery, s3ClientFactory, s3SelectDataType); + + if (recordReader.isEmpty()) { + // S3 Select data type is not mapped to an S3SelectLineRecordReader + return Optional.empty(); + } - RecordCursor cursor = new S3SelectRecordCursor<>(configuration, path, recordReader, length, schema, readerColumns); + RecordCursor cursor = new S3SelectRecordCursor<>(configuration, path, recordReader.get(), length, schema, readerColumns); return Optional.of(new ReaderRecordCursorWithProjections(cursor, projectedReaderColumns)); } - - // unsupported serdes - return Optional.empty(); + else { + // unsupported serdes + return Optional.empty(); + } } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectSerDeDataTypeMapper.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectSerDeDataTypeMapper.java new file mode 100644 index 000000000000..348d6511ada9 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectSerDeDataTypeMapper.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hive.hcatalog.data.JsonSerDe; + +import java.util.Map; +import java.util.Optional; + +public class S3SelectSerDeDataTypeMapper +{ + // Contains mapping of SerDe class name -> data type. Multiple SerDe classes can be mapped to the same data type. + private static final Map serDeToDataTypeMapping = Map.of( + LazySimpleSerDe.class.getName(), S3SelectDataType.CSV, + JsonSerDe.class.getName(), S3SelectDataType.JSON); + + private S3SelectSerDeDataTypeMapper() {} + + public static Optional getDataType(String serdeName) + { + return Optional.ofNullable(serDeToDataTypeMapping.get(serdeName)); + } + + public static boolean doesSerDeExist(String serdeName) + { + return serDeToDataTypeMapping.containsKey(serdeName); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java similarity index 96% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java index dc3013db06fd..4a67005ab58f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive.s3select; +package io.trino.plugin.hive.s3select.csv; import com.amazonaws.services.s3.model.CSVInput; import com.amazonaws.services.s3.model.CSVOutput; @@ -20,6 +20,8 @@ import com.amazonaws.services.s3.model.OutputSerialization; import com.amazonaws.services.s3.model.SelectObjectContentRequest; import io.trino.plugin.hive.s3.TrinoS3FileSystem; +import io.trino.plugin.hive.s3select.S3SelectLineRecordReader; +import io.trino.plugin.hive.s3select.TrinoS3ClientFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -30,7 +32,7 @@ import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.QUOTE_CHAR; -class S3SelectCsvRecordReader +public class S3SelectCsvRecordReader extends S3SelectLineRecordReader { /* diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/json/S3SelectJsonRecordReader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/json/S3SelectJsonRecordReader.java new file mode 100644 index 000000000000..22d0c905c338 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/json/S3SelectJsonRecordReader.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select.json; + +import com.amazonaws.services.s3.model.ExpressionType; +import com.amazonaws.services.s3.model.InputSerialization; +import com.amazonaws.services.s3.model.JSONInput; +import com.amazonaws.services.s3.model.JSONOutput; +import com.amazonaws.services.s3.model.JSONType; +import com.amazonaws.services.s3.model.OutputSerialization; +import com.amazonaws.services.s3.model.SelectObjectContentRequest; +import io.trino.plugin.hive.s3.TrinoS3FileSystem; +import io.trino.plugin.hive.s3select.S3SelectLineRecordReader; +import io.trino.plugin.hive.s3select.TrinoS3ClientFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.net.URI; +import java.util.Properties; + +public class S3SelectJsonRecordReader + extends S3SelectLineRecordReader +{ + public S3SelectJsonRecordReader(Configuration configuration, + Path path, + long start, + long length, + Properties schema, + String ionSqlQuery, + TrinoS3ClientFactory s3ClientFactory) + { + super(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory); + } + + @Override + public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, String query, Path path) + { + SelectObjectContentRequest selectObjectRequest = new SelectObjectContentRequest(); + URI uri = path.toUri(); + selectObjectRequest.setBucketName(TrinoS3FileSystem.extractBucketName(uri)); + selectObjectRequest.setKey(TrinoS3FileSystem.keyFromPath(path)); + selectObjectRequest.setExpression(query); + selectObjectRequest.setExpressionType(ExpressionType.SQL); + + // JSONType.LINES is the only JSON format supported by the Hive JsonSerDe. + JSONInput selectObjectJSONInputSerialization = new JSONInput(); + selectObjectJSONInputSerialization.setType(JSONType.LINES); + + InputSerialization selectObjectInputSerialization = new InputSerialization(); + selectObjectInputSerialization.setCompressionType(getCompressionType(path)); + selectObjectInputSerialization.setJson(selectObjectJSONInputSerialization); + selectObjectRequest.setInputSerialization(selectObjectInputSerialization); + + OutputSerialization selectObjectOutputSerialization = new OutputSerialization(); + JSONOutput selectObjectJSONOutputSerialization = new JSONOutput(); + selectObjectOutputSerialization.setJson(selectObjectJSONOutputSerialization); + selectObjectRequest.setOutputSerialization(selectObjectOutputSerialization); + + return selectObjectRequest; + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java index 8ba2e688d265..243a26470e1c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java @@ -54,32 +54,46 @@ public class TestIonSqlQueryBuilder @Test public void testBuildSQL() { - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER); List columns = ImmutableList.of( createBaseColumn("n_nationkey", 0, HIVE_INT, INTEGER, REGULAR, Optional.empty()), createBaseColumn("n_name", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()), createBaseColumn("n_regionkey", 2, HIVE_INT, INTEGER, REGULAR, Optional.empty())); - assertEquals("SELECT s._1, s._2, s._3 FROM S3Object s", - queryBuilder.buildSql(columns, TupleDomain.all())); + // CSV + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.CSV); + assertEquals(queryBuilder.buildSql(columns, TupleDomain.all()), + "SELECT s._1, s._2, s._3 FROM S3Object s"); + TupleDomain tupleDomain = withColumnDomains(ImmutableMap.of( columns.get(2), Domain.create(SortedRangeSet.copyOf(BIGINT, ImmutableList.of(Range.equal(BIGINT, 3L))), false))); - assertEquals("SELECT s._1, s._2, s._3 FROM S3Object s WHERE (case s._3 when '' then null else CAST(s._3 AS INT) end = 3)", - queryBuilder.buildSql(columns, tupleDomain)); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), + "SELECT s._1, s._2, s._3 FROM S3Object s WHERE (case s._3 when '' then null else CAST(s._3 AS INT) end = 3)"); + + // JSON + queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.JSON); + assertEquals(queryBuilder.buildSql(columns, TupleDomain.all()), + "SELECT s.n_nationkey, s.n_name, s.n_regionkey FROM S3Object s"); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), + "SELECT s.n_nationkey, s.n_name, s.n_regionkey FROM S3Object s " + + "WHERE (case s.n_regionkey when '' then null else CAST(s.n_regionkey AS INT) end = 3)"); } @Test public void testEmptyColumns() { - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER); - assertEquals("SELECT ' ' FROM S3Object s", queryBuilder.buildSql(ImmutableList.of(), TupleDomain.all())); + // CSV + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.CSV); + assertEquals(queryBuilder.buildSql(ImmutableList.of(), TupleDomain.all()), "SELECT ' ' FROM S3Object s"); + + // JSON + queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.JSON); + assertEquals(queryBuilder.buildSql(ImmutableList.of(), TupleDomain.all()), "SELECT ' ' FROM S3Object s"); } @Test public void testDecimalColumns() { TypeManager typeManager = TESTING_TYPE_MANAGER; - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager); List columns = ImmutableList.of( createBaseColumn("quantity", 0, HiveType.valueOf("decimal(20,0)"), DecimalType.createDecimalType(), REGULAR, Optional.empty()), createBaseColumn("extendedprice", 1, HiveType.valueOf("decimal(20,2)"), DecimalType.createDecimalType(), REGULAR, Optional.empty()), @@ -90,29 +104,43 @@ public void testDecimalColumns() columns.get(0), Domain.create(ofRanges(Range.lessThan(DecimalType.createDecimalType(20, 0), longDecimal("50"))), false), columns.get(1), Domain.create(ofRanges(Range.equal(HiveType.valueOf("decimal(20,2)").getType(typeManager), longDecimal("0.05"))), false), columns.get(2), Domain.create(ofRanges(Range.range(decimalType, shortDecimal("0.0"), true, shortDecimal("0.02"), true)), false))); - assertEquals("SELECT s._1, s._2, s._3 FROM S3Object s WHERE ((case s._1 when '' then null else CAST(s._1 AS DECIMAL(20,0)) end < 50)) AND " + - "(case s._2 when '' then null else CAST(s._2 AS DECIMAL(20,2)) end = 0.05) AND ((case s._3 when '' then null else CAST(s._3 AS DECIMAL(10,2)) " + - "end >= 0.00 AND case s._3 when '' then null else CAST(s._3 AS DECIMAL(10,2)) end <= 0.02))", - queryBuilder.buildSql(columns, tupleDomain)); + + // CSV + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, S3SelectDataType.CSV); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), + "SELECT s._1, s._2, s._3 FROM S3Object s WHERE ((case s._1 when '' then null else CAST(s._1 AS DECIMAL(20,0)) end < 50)) AND " + + "(case s._2 when '' then null else CAST(s._2 AS DECIMAL(20,2)) end = 0.05) AND ((case s._3 when '' then null else CAST(s._3 AS DECIMAL(10,2)) " + + "end >= 0.00 AND case s._3 when '' then null else CAST(s._3 AS DECIMAL(10,2)) end <= 0.02))"); + + // JSON + queryBuilder = new IonSqlQueryBuilder(typeManager, S3SelectDataType.JSON); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), + "SELECT s.quantity, s.extendedprice, s.discount FROM S3Object s WHERE ((case s.quantity when '' then null else CAST(s.quantity AS DECIMAL(20,0)) end < 50)) AND " + + "(case s.extendedprice when '' then null else CAST(s.extendedprice AS DECIMAL(20,2)) end = 0.05) AND ((case s.discount when '' then null else CAST(s.discount AS DECIMAL(10,2)) " + + "end >= 0.00 AND case s.discount when '' then null else CAST(s.discount AS DECIMAL(10,2)) end <= 0.02))"); } @Test public void testDateColumn() { - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER); List columns = ImmutableList.of( createBaseColumn("t1", 0, HIVE_TIMESTAMP, TIMESTAMP_MILLIS, REGULAR, Optional.empty()), createBaseColumn("t2", 1, HIVE_DATE, DATE, REGULAR, Optional.empty())); TupleDomain tupleDomain = withColumnDomains(ImmutableMap.of( columns.get(1), Domain.create(SortedRangeSet.copyOf(DATE, ImmutableList.of(Range.equal(DATE, (long) DateTimeUtils.parseDate("2001-08-22")))), false))); - assertEquals("SELECT s._1, s._2 FROM S3Object s WHERE (case s._2 when '' then null else CAST(s._2 AS TIMESTAMP) end = `2001-08-22`)", queryBuilder.buildSql(columns, tupleDomain)); + // CSV + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.CSV); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s._1, s._2 FROM S3Object s WHERE (case s._2 when '' then null else CAST(s._2 AS TIMESTAMP) end = `2001-08-22`)"); + + // JSON + queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.JSON); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s.t1, s.t2 FROM S3Object s WHERE (case s.t2 when '' then null else CAST(s.t2 AS TIMESTAMP) end = `2001-08-22`)"); } @Test public void testNotPushDoublePredicates() { - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER); List columns = ImmutableList.of( createBaseColumn("quantity", 0, HIVE_INT, INTEGER, REGULAR, Optional.empty()), createBaseColumn("extendedprice", 1, HIVE_DOUBLE, DOUBLE, REGULAR, Optional.empty()), @@ -122,7 +150,13 @@ public void testNotPushDoublePredicates() columns.get(0), Domain.create(ofRanges(Range.lessThan(BIGINT, 50L)), false), columns.get(1), Domain.create(ofRanges(Range.equal(DOUBLE, 0.05)), false), columns.get(2), Domain.create(ofRanges(Range.range(DOUBLE, 0.0, true, 0.02, true)), false))); - assertEquals("SELECT s._1, s._2, s._3 FROM S3Object s WHERE ((case s._1 when '' then null else CAST(s._1 AS INT) end < 50))", - queryBuilder.buildSql(columns, tupleDomain)); + + // CSV + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.CSV); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s._1, s._2, s._3 FROM S3Object s WHERE ((case s._1 when '' then null else CAST(s._1 AS INT) end < 50))"); + + // JSON + queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.JSON); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s.quantity, s.extendedprice, s.discount FROM S3Object s WHERE ((case s.quantity when '' then null else CAST(s.quantity AS INT) end < 50))"); } }