Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ check_vars S3_BUCKET S3_BUCKET_ENDPOINT \
AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY

cleanup_hadoop_docker_containers

# Use Hadoop version 3.1 for S3 tests as the JSON SerDe class is not available in lower versions.
export HADOOP_BASE_IMAGE="ghcr.io/trinodb/testing/hdp3.1-hive"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @findepi (in case you weren't already aware).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hashhar

If I am reading this correctly, this removes the point of running these tests in a matrix, which we still do

source plugin/trino-hive-hadoop2/conf/hive-tests-${{ matrix.config }}.sh &&
plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh

@arhimondr was it a conscious decision?
did you want to remove matrix from that CI job?

cc @nineinchnick @alexjo2144

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should either make testGetRecordsJson run only on HDP3 (via surefire config probably) or remove matrix (we probably shouldn't do this to maintain coverage across distros).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we should maintain the matrix. The Hive version may matter for what exactly gets created on S3.

@arhimondr @preethiratnam
can you please remove export HADOOP_BASE_IMAGE=... hack from run_hive_s3_tests.sh?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @findepi @hashhar I want to use the HADOOP_BASE_IMAGE 3.1 to run Hive S3 tests. This is because the JSONSerDe class is not available in the 2.6 version.

Is there a better way of configuring the JSON tests to use 3.1 version?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@preethiratnam We need to use test exclusions via maven profiles for that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll raise a new PR to fix this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi No, unfortunately it wasn't a conscious decision. I think I misread and assumed that it will only change the default value for S3 tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #13486 to fix this.

start_hadoop_docker_containers

test_directory="$(date '+%Y%m%d-%H%M%S')-$(uuidgen | sha1sum | cut -b 1-6)"
Expand Down Expand Up @@ -66,6 +69,14 @@ exec_in_hadoop_master_container /usr/bin/hive -e "
STORED AS TEXTFILE
LOCATION '${table_path}'"

table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_json/"
exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}"
exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table.json{,.gz,.bz2} "${table_path}"
exec_in_hadoop_master_container /usr/bin/hive -e "
CREATE EXTERNAL TABLE trino_s3select_test_external_fs_json(col_1 bigint, col_2 bigint)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '${table_path}'"

stop_unnecessary_hadoop_services

# restart hive-metastore to apply S3 changes in core-site.xml
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-hive-hadoop2/conf/files/test_table.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"col_1":2, "col_2":4}
{"col_1":5, "col_2":6}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class TestHiveFileSystemS3SelectPushdown
{
protected SchemaTableName tableWithPipeDelimiter;
protected SchemaTableName tableWithCommaDelimiter;
protected SchemaTableName tableJson;

@Parameters({
"hive.hadoop2.metastoreHost",
Expand All @@ -45,6 +46,7 @@ public void setup(String host, int port, String databaseName, String awsAccessKe

tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter");
tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter");
tableJson = new SchemaTableName(database, "trino_s3select_test_external_fs_json");
}

@Test
Expand Down Expand Up @@ -72,4 +74,17 @@ public void testGetRecordsWithCommaDelimiter()
.row(11L, 24L).row(1L, 6L).row(21L, 12L).row(0L, 0L) // test_table_with_comma_delimiter.csv.bz2
.build());
}

@Test
public void testGetRecordsJson()
throws Exception
{
assertEqualsIgnoreOrder(
readTable(tableJson),
MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT)
.row(2L, 4L).row(5L, 6L) // test_table.json
.row(7L, 23L).row(28L, 22L).row(13L, 10L) // test_table.json.gz
.row(1L, 19L).row(6L, 3L).row(24L, 22L).row(100L, 77L) // test_table.json.bz2
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.spi.TrinoException;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
Expand All @@ -36,6 +37,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateType.DATE;
Expand All @@ -59,10 +61,12 @@ public class IonSqlQueryBuilder
private static final DateTimeFormatter FORMATTER = date().withChronology(getInstanceUTC());
private static final String DATA_SOURCE = "S3Object s";
private final TypeManager typeManager;
private final S3SelectDataType s3SelectDataType;

public IonSqlQueryBuilder(TypeManager typeManager)
public IonSqlQueryBuilder(TypeManager typeManager, S3SelectDataType s3SelectDataType)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.s3SelectDataType = requireNonNull(s3SelectDataType, "s3SelectDataType is null");
}

public String buildSql(List<HiveColumnHandle> columns, TupleDomain<HiveColumnHandle> tupleDomain)
Expand All @@ -72,21 +76,24 @@ public String buildSql(List<HiveColumnHandle> columns, TupleDomain<HiveColumnHan
domains.keySet().forEach(column -> checkArgument(column.isBaseColumn(), "%s is not a base column", column));
});

// SELECT clause
StringBuilder sql = new StringBuilder("SELECT ");

if (columns.isEmpty()) {
sql.append("' '");
}
else {
String columnNames = columns.stream()
.map(column -> format("s._%d", column.getBaseHiveColumnIndex() + 1))
.map(this::getFullyQualifiedColumnName)
.collect(joining(", "));
sql.append(columnNames);
}

// FROM clause
sql.append(" FROM ");
sql.append(DATA_SOURCE);

// WHERE clause
List<String> clauses = toConjuncts(columns, tupleDomain);
if (!clauses.isEmpty()) {
sql.append(" WHERE ")
Expand All @@ -96,6 +103,18 @@ public String buildSql(List<HiveColumnHandle> columns, TupleDomain<HiveColumnHan
return sql.toString();
}

private String getFullyQualifiedColumnName(HiveColumnHandle column)
{
switch (s3SelectDataType) {
case JSON:
return format("s.%s", column.getBaseColumnName());
case CSV:
return format("s._%d", column.getBaseHiveColumnIndex() + 1);
default:
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Attempted to build SQL for unknown S3SelectDataType");
}
}

private List<String> toConjuncts(List<HiveColumnHandle> columns, TupleDomain<HiveColumnHandle> tupleDomain)
{
ImmutableList.Builder<String> builder = ImmutableList.builder();
Expand All @@ -104,7 +123,7 @@ private List<String> toConjuncts(List<HiveColumnHandle> columns, TupleDomain<Hiv
if (tupleDomain.getDomains().isPresent() && isSupported(type)) {
Domain domain = tupleDomain.getDomains().get().get(column);
if (domain != null) {
builder.add(toPredicate(domain, type, column.getBaseHiveColumnIndex()));
builder.add(toPredicate(domain, type, column));
}
}
}
Expand All @@ -124,13 +143,13 @@ private static boolean isSupported(Type type)
validType instanceof VarcharType;
}

private String toPredicate(Domain domain, Type type, int position)
private String toPredicate(Domain domain, Type type, HiveColumnHandle column)
{
checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");

if (domain.getValues().isNone()) {
if (domain.isNullAllowed()) {
return format("s._%d", position + 1) + " = '' ";
return getFullyQualifiedColumnName(column) + " = '' ";
}
return "FALSE";
}
Expand All @@ -139,7 +158,7 @@ private String toPredicate(Domain domain, Type type, int position)
if (domain.isNullAllowed()) {
return "TRUE";
}
return format("s._%d", position + 1) + " <> '' ";
return getFullyQualifiedColumnName(column) + " <> '' ";
}

List<String> disjuncts = new ArrayList<>();
Expand All @@ -152,10 +171,10 @@ private String toPredicate(Domain domain, Type type, int position)
}
List<String> rangeConjuncts = new ArrayList<>();
if (!range.isLowUnbounded()) {
rangeConjuncts.add(toPredicate(range.isLowInclusive() ? ">=" : ">", range.getLowBoundedValue(), type, position));
rangeConjuncts.add(toPredicate(range.isLowInclusive() ? ">=" : ">", range.getLowBoundedValue(), type, column));
}
if (!range.isHighUnbounded()) {
rangeConjuncts.add(toPredicate(range.isHighInclusive() ? "<=" : "<", range.getHighBoundedValue(), type, position));
rangeConjuncts.add(toPredicate(range.isHighInclusive() ? "<=" : "<", range.getHighBoundedValue(), type, column));
}
// If rangeConjuncts is null, then the range was ALL, which should already have been checked for
checkState(!rangeConjuncts.isEmpty());
Expand All @@ -164,31 +183,31 @@ private String toPredicate(Domain domain, Type type, int position)

// Add back all of the possible single values either as an equality or an IN predicate
if (singleValues.size() == 1) {
disjuncts.add(toPredicate("=", getOnlyElement(singleValues), type, position));
disjuncts.add(toPredicate("=", getOnlyElement(singleValues), type, column));
}
else if (singleValues.size() > 1) {
List<String> values = new ArrayList<>();
for (Object value : singleValues) {
checkType(type);
values.add(valueToQuery(type, value));
}
disjuncts.add(createColumn(type, position) + " IN (" + Joiner.on(",").join(values) + ")");
disjuncts.add(createColumn(type, column) + " IN (" + Joiner.on(",").join(values) + ")");
}

// Add nullability disjuncts
checkState(!disjuncts.isEmpty());
if (domain.isNullAllowed()) {
disjuncts.add(format("s._%d", position + 1) + " = '' ");
disjuncts.add(getFullyQualifiedColumnName(column) + " = '' ");
}

return "(" + Joiner.on(" OR ").join(disjuncts) + ")";
}

private String toPredicate(String operator, Object value, Type type, int position)
private String toPredicate(String operator, Object value, Type type, HiveColumnHandle column)
{
checkType(type);

return format("%s %s %s", createColumn(type, position), operator, valueToQuery(type, value));
return format("%s %s %s", createColumn(type, column), operator, valueToQuery(type, value));
}

private static void checkType(Type type)
Expand Down Expand Up @@ -228,9 +247,9 @@ private static String valueToQuery(Type type, Object value)
return "'" + ((Slice) value).toStringUtf8() + "'";
}

private String createColumn(Type type, int position)
private String createColumn(Type type, HiveColumnHandle columnHandle)
{
String column = format("s._%d", position + 1);
String column = getFullyQualifiedColumnName(columnHandle);

if (type.equals(BIGINT) || type.equals(INTEGER) || type.equals(SMALLINT) || type.equals(TINYINT)) {
return formatPredicate(column, "INT");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.s3select;

public enum S3SelectDataType {
CSV,
JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract class S3SelectLineRecordReader
protected final CompressionCodecFactory compressionCodecFactory;
protected final String lineDelimiter;

S3SelectLineRecordReader(
public S3SelectLineRecordReader(
Configuration configuration,
Path path,
long start,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.s3select;

import io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader;
import io.trino.plugin.hive.s3select.json.S3SelectJsonRecordReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.util.Optional;
import java.util.Properties;

/**
* Returns an S3SelectLineRecordReader based on the serDe class. It supports CSV and JSON formats, and
* will not push down any other formats.
*/
public class S3SelectLineRecordReaderProvider
{
private S3SelectLineRecordReaderProvider() {}

public static Optional<S3SelectLineRecordReader> get(Configuration configuration,
Path path,
long start,
long length,
Properties schema,
String ionSqlQuery,
TrinoS3ClientFactory s3ClientFactory,
S3SelectDataType dataType)
{
switch (dataType) {
case CSV:
return Optional.of(new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory));
case JSON:
return Optional.of(new S3SelectJsonRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory));
default:
// return empty if data type is not returned by the serDeMapper or unrecognizable by the LineRecordReader
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.ConnectorSession;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
Expand Down Expand Up @@ -56,7 +55,6 @@
public final class S3SelectPushdown
{
private static final Set<String> SUPPORTED_S3_PREFIXES = ImmutableSet.of("s3://", "s3a://", "s3n://");
private static final Set<String> SUPPORTED_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName());

/*
* Double and Real Types lose precision. Thus, they are not pushed down to S3. Please use Decimal Type if push down is desired.
Expand All @@ -77,10 +75,10 @@ public final class S3SelectPushdown

private S3SelectPushdown() {}

private static boolean isSerdeSupported(Properties schema)
private static boolean isSerDeSupported(Properties schema)
{
String serdeName = getDeserializerClassName(schema);
return SUPPORTED_SERDES.contains(serdeName);
return S3SelectSerDeDataTypeMapper.doesSerDeExist(serdeName);
}

private static boolean isInputFormatSupported(Properties schema)
Expand Down Expand Up @@ -163,7 +161,7 @@ public static boolean shouldEnablePushdownForTable(ConnectorSession session, Tab
private static boolean shouldEnablePushdownForTable(Table table, String path, Properties schema)
{
return isS3Storage(path) &&
isSerdeSupported(schema) &&
isSerDeSupported(schema) &&
isInputFormatSupported(schema) &&
areColumnTypesSupported(table.getDataColumns());
}
Expand Down
Loading