diff --git a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh index 97b0cb530e3e..2caa78bd3678 100755 --- a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh +++ b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh @@ -46,6 +46,26 @@ exec_in_hadoop_master_container /usr/bin/hive -e " LOCATION '${table_path}' TBLPROPERTIES ('skip.header.line.count'='2', 'skip.footer.line.count'='2')" +table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_with_pipe_delimiter/" +exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}" +exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table_with_pipe_delimiter.csv{,.gz,.bz2} "${table_path}" +exec_in_hadoop_master_container /usr/bin/hive -e " + CREATE EXTERNAL TABLE trino_s3select_test_external_fs_with_pipe_delimiter(t_bigint bigint, s_bigint bigint) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '|' + STORED AS TEXTFILE + LOCATION '${table_path}'" + +table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_with_comma_delimiter/" +exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}" +exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table_with_comma_delimiter.csv{,.gz,.bz2} "${table_path}" +exec_in_hadoop_master_container /usr/bin/hive -e " + CREATE EXTERNAL TABLE trino_s3select_test_external_fs_with_comma_delimiter(t_bigint bigint, s_bigint bigint) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + STORED AS TEXTFILE + LOCATION '${table_path}'" + stop_unnecessary_hadoop_services # restart hive-metastore to apply S3 changes in core-site.xml diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv b/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv new file mode 100644 index 000000000000..424dc886fde8 --- /dev/null +++ b/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv @@ -0,0 +1,3 @@ +7,1 +19,10 +1,345 diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv.bz2 b/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv.bz2 new file mode 100644 index 000000000000..5d30848665d4 Binary files /dev/null and b/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv.bz2 differ diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv.gz b/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv.gz new file mode 100644 index 000000000000..2d8c9cb91edd Binary files /dev/null and b/plugin/trino-hive-hadoop2/conf/files/test_table_with_comma_delimiter.csv.gz differ diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv b/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv new file mode 100644 index 000000000000..0cc012bf882a --- /dev/null +++ b/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv @@ -0,0 +1,3 @@ +1|2 +3|4 +55|66 diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv.bz2 b/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv.bz2 new file mode 100644 index 000000000000..df138bc6d194 Binary files /dev/null and b/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv.bz2 differ diff --git a/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv.gz b/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv.gz new file mode 100644 index 000000000000..6634c19f3345 Binary files /dev/null and b/plugin/trino-hive-hadoop2/conf/files/test_table_with_pipe_delimiter.csv.gz differ diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java index 1070fe76fb7c..f3e5ae8f3673 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java @@ -14,12 +14,21 @@ package io.trino.plugin.hive.s3select; import io.trino.plugin.hive.AbstractTestHiveFileSystemS3; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.MaterializedResult; import org.testng.annotations.BeforeClass; import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; public class TestHiveFileSystemS3SelectPushdown extends AbstractTestHiveFileSystemS3 { + protected SchemaTableName tableWithPipeDelimiter; + protected SchemaTableName tableWithCommaDelimiter; + @Parameters({ "hive.hadoop2.metastoreHost", "hive.hadoop2.metastorePort", @@ -33,5 +42,34 @@ public class TestHiveFileSystemS3SelectPushdown public void setup(String host, int port, String databaseName, String awsAccessKey, String awsSecretKey, String writableBucket, String testDirectory) { super.setup(host, port, databaseName, awsAccessKey, awsSecretKey, writableBucket, testDirectory, true); + + tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter"); + tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter"); + } + + @Test + public void testGetRecordsWithPipeDelimiter() + throws Exception + { + assertEqualsIgnoreOrder( + readTable(tableWithPipeDelimiter), + MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT) + .row(1L, 2L).row(3L, 4L).row(55L, 66L) // test_table_with_pipe_delimiter.csv + .row(27L, 10L).row(8L, 2L).row(456L, 789L) // test_table_with_pipe_delimiter.csv.gzip + .row(22L, 11L).row(78L, 76L).row(1L, 2L).row(36L, 90L) // test_table_with_pipe_delimiter.csv.bz2 + .build()); + } + + @Test + public void testGetRecordsWithCommaDelimiter() + throws Exception + { + assertEqualsIgnoreOrder( + readTable(tableWithCommaDelimiter), + MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT) + .row(7L, 1L).row(19L, 10L).row(1L, 345L) // test_table_with_comma_delimiter.csv + .row(27L, 10L).row(28L, 9L).row(90L, 94L) // test_table_with_comma_delimiter.csv.gzip + .row(11L, 24L).row(1L, 6L).row(21L, 12L).row(0L, 0L) // test_table_with_comma_delimiter.csv.bz2 + .build()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java index 226824c54912..dc3013db06fd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectCsvRecordReader.java @@ -27,6 +27,7 @@ import java.util.Properties; import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR; +import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.QUOTE_CHAR; class S3SelectCsvRecordReader @@ -41,6 +42,7 @@ class S3SelectCsvRecordReader */ private static final String COMMENTS_CHAR_STR = "\uFDD0"; + private static final String DEFAULT_FIELD_DELIMITER = ","; public S3SelectCsvRecordReader( Configuration configuration, @@ -91,4 +93,10 @@ public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, St return selectObjectRequest; } + + protected String getFieldDelimiter(Properties schema) + { + // Use the field delimiter only if it is specified in the schema. If not, use a default field delimiter ',' + return schema.getProperty(FIELD_DELIM, DEFAULT_FIELD_DELIMITER); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java index b7f32548fe98..84fa6199e9cc 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReader.java @@ -52,9 +52,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; @ThreadSafe public abstract class S3SelectLineRecordReader @@ -224,11 +222,6 @@ public float getProgress() return ((float) (position - start)) / (end - start); } - String getFieldDelimiter(Properties schema) - { - return schema.getProperty(FIELD_DELIM, schema.getProperty(SERIALIZATION_FORMAT)); - } - /** * This exception is for stopping retries for S3 Select calls that shouldn't be retried. * For example, "Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403 ..." diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java index 40073e4e2b62..27ec1e12abe9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java @@ -106,9 +106,10 @@ private static boolean isInputFormatSupported(Properties schema) public static boolean isCompressionCodecSupported(InputFormat inputFormat, Path path) { if (inputFormat instanceof TextInputFormat) { + // S3 Select supports the following formats: uncompressed, GZIP and BZIP2. return getCompressionCodec((TextInputFormat) inputFormat, path) .map(codec -> (codec instanceof GzipCodec) || (codec instanceof BZip2Codec)) - .orElse(false); // TODO (https://github.com/trinodb/trino/issues/2475) fix S3 Select when file not compressed + .orElse(true); } return false; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectPushdown.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectPushdown.java index 9ec560570dd1..6f1fc59e4f7c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectPushdown.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectPushdown.java @@ -38,7 +38,7 @@ public void setUp() public void testIsCompressionCodecSupported() { assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.gz"))); - assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject"))); + assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject"))); assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.lz4"))); assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.snappy"))); assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.bz2")));