Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@
*/
package io.trino.plugin.hive.s3select;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.AbstractTestHiveFileSystemS3;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.MaterializedResult;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Parameters;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.trino.plugin.hive.HiveType.HIVE_INT;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;

Expand All @@ -43,7 +51,6 @@ public class TestHiveFileSystemS3SelectPushdown
public void setup(String host, int port, String databaseName, String awsAccessKey, String awsSecretKey, String writableBucket, String testDirectory)
{
super.setup(host, port, databaseName, awsAccessKey, awsSecretKey, writableBucket, testDirectory, true);

tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter");
tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter");
tableJson = new SchemaTableName(database, "trino_s3select_test_external_fs_json");
Expand All @@ -62,6 +69,22 @@ public void testGetRecordsWithPipeDelimiter()
.build());
}

@Test
public void testFilterRecordsWithPipeDelimiter()
throws Exception
{
List<ColumnHandle> projectedColumns = ImmutableList.of(
createBaseColumn("t_bigint", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty()));

assertEqualsIgnoreOrder(
filterTable(tableWithPipeDelimiter, projectedColumns),
MaterializedResult.resultBuilder(newSession(), BIGINT)
.row(1L).row(3L).row(55L) // test_table_with_pipe_delimiter.csv
.row(27L).row(8L).row(456L) // test_table_with_pipe_delimiter.csv.gzip
.row(22L).row(78L).row(1L).row(36L) // test_table_with_pipe_delimiter.csv.bz2
.build());
}

@Test
public void testGetRecordsWithCommaDelimiter()
throws Exception
Expand All @@ -75,6 +98,22 @@ public void testGetRecordsWithCommaDelimiter()
.build());
}

@Test
public void testFilterRecordsWithCommaDelimiter()
throws Exception
{
List<ColumnHandle> projectedColumns = ImmutableList.of(
createBaseColumn("t_bigint", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty()));

assertEqualsIgnoreOrder(
filterTable(tableWithCommaDelimiter, projectedColumns),
MaterializedResult.resultBuilder(newSession(), BIGINT)
.row(7L).row(19L).row(1L) // test_table_with_comma_delimiter.csv
.row(27L).row(28L).row(90L) // test_table_with_comma_delimiter.csv.gzip
.row(11L).row(1L).row(21L).row(0L) // test_table_with_comma_delimiter.csv.bz2
.build());
}

@Test
public void testGetRecordsJson()
throws Exception
Expand All @@ -87,4 +126,20 @@ public void testGetRecordsJson()
.row(1L, 19L).row(6L, 3L).row(24L, 22L).row(100L, 77L) // test_table.json.bz2
.build());
}

@Test
public void testFilterRecordsJson()
throws Exception
{
List<ColumnHandle> projectedColumns = ImmutableList.of(
createBaseColumn("col_1", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty()));

assertEqualsIgnoreOrder(
filterTable(tableJson, projectedColumns),
MaterializedResult.resultBuilder(newSession(), BIGINT)
.row(2L).row(5L) // test_table.json
.row(7L).row(28L).row(13L) // test_table.json.gz
.row(1L).row(6L).row(24L).row(100L) // test_table.json.bz2
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.s3select;

import com.google.common.collect.ImmutableSet;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveRecordCursorProvider;
Expand All @@ -24,19 +25,28 @@
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

import javax.inject.Inject;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.apache.hadoop.hive.serde.serdeConstants.COLUMN_NAME_DELIMITER;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES;

public class S3SelectRecordCursorProvider
implements HiveRecordCursorProvider
Expand Down Expand Up @@ -80,17 +90,20 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
// Ignore predicates on partial columns for now.
effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn());

List<HiveColumnHandle> readerColumns = projectedReaderColumns
.map(readColumns -> readColumns.get().stream().map(HiveColumnHandle.class::cast).collect(toImmutableList()))
.orElse(columns.stream().collect(toImmutableList()));
// Query is not going to filter any data, no need to use S3 Select
if (!hasFilters(schema, effectivePredicate, readerColumns)) {
return Optional.empty();
}

String serdeName = getDeserializerClassName(schema);
Optional<S3SelectDataType> s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName);

if (s3SelectDataTypeOptional.isPresent()) {
S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get();

List<HiveColumnHandle> readerColumns = projectedReaderColumns
.map(ReaderColumns::get)
.map(readColumns -> readColumns.stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList()))
.orElse(columns);

IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType);
String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate);
Optional<S3SelectLineRecordReader> recordReader = S3SelectLineRecordReaderProvider.get(configuration, path, start, length, schema,
Expand All @@ -109,4 +122,65 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
return Optional.empty();
}
}

private static boolean hasFilters(
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> readerColumns)
{
//There are no effective predicates and readercolumns and columntypes are identical to schema
//means getting all data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases.
if (effectivePredicate.isAll()) {
return !isEquivalentSchema(readerColumns, schema);
}
return true;
}

private static boolean isEquivalentSchema(List<HiveColumnHandle> readerColumns, Properties schema)
{
Set<String> projectedColumnNames = getColumnProperty(readerColumns, HiveColumnHandle::getName);
Set<String> projectedColumnTypes = getColumnProperty(readerColumns, column -> column.getHiveType().getTypeInfo().getTypeName());
return isEquivalentColumns(projectedColumnNames, schema) && isEquivalentColumnTypes(projectedColumnTypes, schema);
}

private static boolean isEquivalentColumns(Set<String> projectedColumnNames, Properties schema)
{
Set<String> columnNames;
String columnNameProperty = schema.getProperty(LIST_COLUMNS);
if (columnNameProperty.length() == 0) {
columnNames = ImmutableSet.of();
}
else {
String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ",");
columnNames = Arrays.stream(columnNameProperty.split(columnNameDelimiter))
.collect(toImmutableSet());
}
return projectedColumnNames.equals(columnNames);
}

private static boolean isEquivalentColumnTypes(Set<String> projectedColumnTypes, Properties schema)
{
String columnTypeProperty = schema.getProperty(LIST_COLUMN_TYPES);
Set<String> columnTypes;
if (columnTypeProperty.length() == 0) {
columnTypes = ImmutableSet.of();
}
else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty)
.stream()
.map(TypeInfo::getTypeName)
.collect(toImmutableSet());
}
return projectedColumnTypes.equals(columnTypes);
}

private static Set<String> getColumnProperty(List<HiveColumnHandle> readerColumns, Function<HiveColumnHandle, String> mapper)
{
if (readerColumns.isEmpty()) {
return ImmutableSet.of();
}
return readerColumns.stream()
.map(mapper)
.collect(toImmutableSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,43 @@ protected MaterializedResult readTable(SchemaTableName tableName)
}
}

protected MaterializedResult filterTable(SchemaTableName tableName, List<ColumnHandle> columnHandles)
throws IOException
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();

ConnectorTableHandle table = getTableHandle(metadata, tableName);

metadata.beginQuery(session);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table);

List<Type> allTypes = getTypes(columnHandles);
List<Type> dataTypes = getTypes(columnHandles.stream()
.filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
.collect(toImmutableList()));
MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes);

List<ConnectorSplit> splits = getAllSplits(splitSource);
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, table, columnHandles, DynamicFilter.EMPTY)) {
MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
Object[] dataValues = IntStream.range(0, row.getFieldCount())
.filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden())
.mapToObj(row::getField)
.toArray();
result.row(dataValues);
}
}
}

metadata.cleanupQuery(session);
return result.build();
}
}

private ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName)
{
ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class TestS3SelectRecordCursor
{
private static final String LAZY_SERDE_CLASS_NAME = LazySimpleSerDe.class.getName();

private static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
private static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
private static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty());
private static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty());
protected static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
protected static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
protected static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty());
protected static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty());
private static final HiveColumnHandle[] DEFAULT_TEST_COLUMNS = {ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN};

@Test
Expand Down
Loading