Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ exec_in_hadoop_master_container /usr/bin/hive -e "
LOCATION '${table_path}'
TBLPROPERTIES ('skip.header.line.count'='2', 'skip.footer.line.count'='2')"

table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_with_pipe_delimiter/"
exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}"
exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table_with_pipe_delimiter.csv{,.gz,.bz2} "${table_path}"
exec_in_hadoop_master_container /usr/bin/hive -e "
CREATE EXTERNAL TABLE trino_s3select_test_external_fs_with_pipe_delimiter(t_bigint bigint, s_bigint bigint)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '${table_path}'"

table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_with_comma_delimiter/"
exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}"
exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table_with_comma_delimiter.csv{,.gz,.bz2} "${table_path}"
exec_in_hadoop_master_container /usr/bin/hive -e "
CREATE EXTERNAL TABLE trino_s3select_test_external_fs_with_comma_delimiter(t_bigint bigint, s_bigint bigint)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '${table_path}'"

stop_unnecessary_hadoop_services

# restart hive-metastore to apply S3 changes in core-site.xml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
7,1
19,10
1,345
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1|2
3|4
55|66
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@
package io.trino.plugin.hive.s3select;

import io.trino.plugin.hive.AbstractTestHiveFileSystemS3;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.MaterializedResult;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Parameters;
import org.testng.annotations.Test;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;

public class TestHiveFileSystemS3SelectPushdown
extends AbstractTestHiveFileSystemS3
{
protected SchemaTableName tableWithPipeDelimiter;
protected SchemaTableName tableWithCommaDelimiter;

@Parameters({
"hive.hadoop2.metastoreHost",
"hive.hadoop2.metastorePort",
Expand All @@ -33,5 +42,34 @@ public class TestHiveFileSystemS3SelectPushdown
public void setup(String host, int port, String databaseName, String awsAccessKey, String awsSecretKey, String writableBucket, String testDirectory)
{
super.setup(host, port, databaseName, awsAccessKey, awsSecretKey, writableBucket, testDirectory, true);

tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter");
tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter");
}

@Test
public void testGetRecordsWithPipeDelimiter()
throws Exception
{
assertEqualsIgnoreOrder(
readTable(tableWithPipeDelimiter),
MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT)
.row(1L, 2L).row(3L, 4L).row(55L, 66L) // test_table_with_pipe_delimiter.csv
.row(27L, 10L).row(8L, 2L).row(456L, 789L) // test_table_with_pipe_delimiter.csv.gzip
.row(22L, 11L).row(78L, 76L).row(1L, 2L).row(36L, 90L) // test_table_with_pipe_delimiter.csv.bz2
.build());
}

@Test
public void testGetRecordsWithCommaDelimiter()
throws Exception
{
assertEqualsIgnoreOrder(
readTable(tableWithCommaDelimiter),
MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT)
.row(7L, 1L).row(19L, 10L).row(1L, 345L) // test_table_with_comma_delimiter.csv
.row(27L, 10L).row(28L, 9L).row(90L, 94L) // test_table_with_comma_delimiter.csv.gzip
.row(11L, 24L).row(1L, 6L).row(21L, 12L).row(0L, 0L) // test_table_with_comma_delimiter.csv.bz2
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Properties;

import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR;
import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.QUOTE_CHAR;

class S3SelectCsvRecordReader
Expand All @@ -41,6 +42,7 @@ class S3SelectCsvRecordReader
*/

private static final String COMMENTS_CHAR_STR = "\uFDD0";
private static final String DEFAULT_FIELD_DELIMITER = ",";

public S3SelectCsvRecordReader(
Configuration configuration,
Expand Down Expand Up @@ -91,4 +93,10 @@ public SelectObjectContentRequest buildSelectObjectRequest(Properties schema, St

return selectObjectRequest;
}

protected String getFieldDelimiter(Properties schema)
Comment thread
preethiratnam marked this conversation as resolved.
{
// Use the field delimiter only if it is specified in the schema. If not, use a default field delimiter ','
return schema.getProperty(FIELD_DELIM, DEFAULT_FIELD_DELIMITER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;

@ThreadSafe
public abstract class S3SelectLineRecordReader
Expand Down Expand Up @@ -224,11 +222,6 @@ public float getProgress()
return ((float) (position - start)) / (end - start);
}

String getFieldDelimiter(Properties schema)
{
return schema.getProperty(FIELD_DELIM, schema.getProperty(SERIALIZATION_FORMAT));
}

/**
* This exception is for stopping retries for S3 Select calls that shouldn't be retried.
* For example, "Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403 ..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ private static boolean isInputFormatSupported(Properties schema)
public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat, Path path)
{
if (inputFormat instanceof TextInputFormat) {
// S3 Select supports the following formats: uncompressed, GZIP and BZIP2.
return getCompressionCodec((TextInputFormat) inputFormat, path)
.map(codec -> (codec instanceof GzipCodec) || (codec instanceof BZip2Codec))
.orElse(false); // TODO (https://github.com/trinodb/trino/issues/2475) fix S3 Select when file not compressed
.orElse(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What if a file is compressed, but with a codec that is not supported? Maybe something like

Optional<Codec> codec = getCompressionCodec((TextInputFormat) inputFormat, path);
if(codec.isEmpty()){
  // assume uncompressed
  return true;
}

Also I wonder how safe is to assume that a file is uncompressed if a codec is not found for a given extension? (I guess it is as expected as I see similar assumptions being made in other parts of the code, but want to clarify).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Andrii, if a file is compressed with a codec that isn't supported, the codec would be different and this would return false:

codec -> (codec instanceof GzipCodec) || (codec instanceof BZip2Codec)

The default orElse(true) is only effective when the codec is null. So the method returns true only when codec is null (uncompressed), Gzip or Bzip2. We also have unit tests for the isCompressionCodecSupported method with different codec inputs.

Good point about the null codec assumption, though. I think it's reasonable- if there is no codec defined when a Hive table is created, it doesn't depend on codecs and is expected to have uncompressed files. This method internally uses Hadoop's CompressionCodecFactory, which I think is the standard.

Thank you so much for looking into this PR!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default orElse(true) is only effective when the codec is null.

Oh, right. I totally misread it. Sounds good.

}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void setUp()
public void testIsCompressionCodecSupported()
{
assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.gz")));
assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject")));
assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject")));
assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.lz4")));
assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.snappy")));
assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.bz2")));
Expand Down