diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java index a23a19225db..46572b1c1d7 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java @@ -13,13 +13,21 @@ */ package io.trino.plugin.hive.s3select; +import com.google.common.collect.ImmutableList; import io.trino.plugin.hive.AbstractTestHiveFileSystemS3; +import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.testing.MaterializedResult; import org.testng.annotations.BeforeClass; import org.testng.annotations.Parameters; import org.testng.annotations.Test; +import java.util.List; +import java.util.Optional; + +import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; +import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; +import static io.trino.plugin.hive.HiveType.HIVE_INT; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; @@ -43,7 +51,6 @@ public class TestHiveFileSystemS3SelectPushdown public void setup(String host, int port, String databaseName, String awsAccessKey, String awsSecretKey, String writableBucket, String testDirectory) { super.setup(host, port, databaseName, awsAccessKey, awsSecretKey, writableBucket, testDirectory, true); - tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter"); tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter"); tableJson = new SchemaTableName(database, "trino_s3select_test_external_fs_json"); @@ -62,6 +69,22 @@ public void testGetRecordsWithPipeDelimiter() .build()); } + @Test + public void testFilterRecordsWithPipeDelimiter() + throws Exception + { + List projectedColumns = ImmutableList.of( + createBaseColumn("t_bigint", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty())); + + assertEqualsIgnoreOrder( + filterTable(tableWithPipeDelimiter, projectedColumns), + MaterializedResult.resultBuilder(newSession(), BIGINT) + .row(1L).row(3L).row(55L) // test_table_with_pipe_delimiter.csv + .row(27L).row(8L).row(456L) // test_table_with_pipe_delimiter.csv.gzip + .row(22L).row(78L).row(1L).row(36L) // test_table_with_pipe_delimiter.csv.bz2 + .build()); + } + @Test public void testGetRecordsWithCommaDelimiter() throws Exception @@ -75,6 +98,22 @@ public void testGetRecordsWithCommaDelimiter() .build()); } + @Test + public void testFilterRecordsWithCommaDelimiter() + throws Exception + { + List projectedColumns = ImmutableList.of( + createBaseColumn("t_bigint", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty())); + + assertEqualsIgnoreOrder( + filterTable(tableWithCommaDelimiter, projectedColumns), + MaterializedResult.resultBuilder(newSession(), BIGINT) + .row(7L).row(19L).row(1L) // test_table_with_comma_delimiter.csv + .row(27L).row(28L).row(90L) // test_table_with_comma_delimiter.csv.gzip + .row(11L).row(1L).row(21L).row(0L) // test_table_with_comma_delimiter.csv.bz2 + .build()); + } + @Test public void testGetRecordsJson() throws Exception @@ -87,4 +126,20 @@ public void testGetRecordsJson() .row(1L, 19L).row(6L, 3L).row(24L, 22L).row(100L, 77L) // test_table.json.bz2 .build()); } + + @Test + public void testFilterRecordsJson() + throws Exception + { + List projectedColumns = ImmutableList.of( + createBaseColumn("col_1", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty())); + + assertEqualsIgnoreOrder( + filterTable(tableJson, projectedColumns), + MaterializedResult.resultBuilder(newSession(), BIGINT) + .row(2L).row(5L) // test_table.json + .row(7L).row(28L).row(13L) // test_table.json.gz + .row(1L).row(6L).row(24L).row(100L) // test_table.json.bz2 + .build()); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java index 3b1a43d80a5..de8c1cfa408 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive.s3select; +import com.google.common.collect.ImmutableSet; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveRecordCursorProvider; @@ -24,19 +25,28 @@ import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import javax.inject.Inject; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.hadoop.hive.serde.serdeConstants.COLUMN_NAME_DELIMITER; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES; public class S3SelectRecordCursorProvider implements HiveRecordCursorProvider @@ -80,17 +90,20 @@ public Optional createRecordCursor( // Ignore predicates on partial columns for now. effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn()); + List readerColumns = projectedReaderColumns + .map(readColumns -> readColumns.get().stream().map(HiveColumnHandle.class::cast).collect(toImmutableList())) + .orElse(columns.stream().collect(toImmutableList())); + // Query is not going to filter any data, no need to use S3 Select + if (!hasFilters(schema, effectivePredicate, readerColumns)) { + return Optional.empty(); + } + String serdeName = getDeserializerClassName(schema); Optional s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName); if (s3SelectDataTypeOptional.isPresent()) { S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get(); - List readerColumns = projectedReaderColumns - .map(ReaderColumns::get) - .map(readColumns -> readColumns.stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList())) - .orElse(columns); - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType); String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate); Optional recordReader = S3SelectLineRecordReaderProvider.get(configuration, path, start, length, schema, @@ -109,4 +122,65 @@ public Optional createRecordCursor( return Optional.empty(); } } + + private static boolean hasFilters( + Properties schema, + TupleDomain effectivePredicate, + List readerColumns) + { + //There are no effective predicates and readercolumns and columntypes are identical to schema + //means getting all data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases. + if (effectivePredicate.isAll()) { + return !isEquivalentSchema(readerColumns, schema); + } + return true; + } + + private static boolean isEquivalentSchema(List readerColumns, Properties schema) + { + Set projectedColumnNames = getColumnProperty(readerColumns, HiveColumnHandle::getName); + Set projectedColumnTypes = getColumnProperty(readerColumns, column -> column.getHiveType().getTypeInfo().getTypeName()); + return isEquivalentColumns(projectedColumnNames, schema) && isEquivalentColumnTypes(projectedColumnTypes, schema); + } + + private static boolean isEquivalentColumns(Set projectedColumnNames, Properties schema) + { + Set columnNames; + String columnNameProperty = schema.getProperty(LIST_COLUMNS); + if (columnNameProperty.length() == 0) { + columnNames = ImmutableSet.of(); + } + else { + String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ","); + columnNames = Arrays.stream(columnNameProperty.split(columnNameDelimiter)) + .collect(toImmutableSet()); + } + return projectedColumnNames.equals(columnNames); + } + + private static boolean isEquivalentColumnTypes(Set projectedColumnTypes, Properties schema) + { + String columnTypeProperty = schema.getProperty(LIST_COLUMN_TYPES); + Set columnTypes; + if (columnTypeProperty.length() == 0) { + columnTypes = ImmutableSet.of(); + } + else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty) + .stream() + .map(TypeInfo::getTypeName) + .collect(toImmutableSet()); + } + return projectedColumnTypes.equals(columnTypes); + } + + private static Set getColumnProperty(List readerColumns, Function mapper) + { + if (readerColumns.isEmpty()) { + return ImmutableSet.of(); + } + return readerColumns.stream() + .map(mapper) + .collect(toImmutableSet()); + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 6c84fca0068..9939382d9a1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -603,6 +603,43 @@ protected MaterializedResult readTable(SchemaTableName tableName) } } + protected MaterializedResult filterTable(SchemaTableName tableName, List columnHandles) + throws IOException + { + try (Transaction transaction = newTransaction()) { + ConnectorMetadata metadata = transaction.getMetadata(); + ConnectorSession session = newSession(); + + ConnectorTableHandle table = getTableHandle(metadata, tableName); + + metadata.beginQuery(session); + ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table); + + List allTypes = getTypes(columnHandles); + List dataTypes = getTypes(columnHandles.stream() + .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden()) + .collect(toImmutableList())); + MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes); + + List splits = getAllSplits(splitSource); + for (ConnectorSplit split : splits) { + try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, table, columnHandles, DynamicFilter.EMPTY)) { + MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); + for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { + Object[] dataValues = IntStream.range(0, row.getFieldCount()) + .filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden()) + .mapToObj(row::getField) + .toArray(); + result.row(dataValues); + } + } + } + + metadata.cleanupQuery(session); + return result.build(); + } + } + private ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName) { ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java index a2c0e5a9204..fe37df05f2b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java @@ -45,10 +45,10 @@ public class TestS3SelectRecordCursor { private static final String LAZY_SERDE_CLASS_NAME = LazySimpleSerDe.class.getName(); - private static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); - private static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); - private static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty()); - private static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty()); + protected static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); + protected static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); + protected static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty()); + protected static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty()); private static final HiveColumnHandle[] DEFAULT_TEST_COLUMNS = {ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN}; @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java new file mode 100644 index 00000000000..b676e405297 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java @@ -0,0 +1,161 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.hadoop.ConfigurationInstantiator; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveRecordCursorProvider.ReaderRecordCursorWithProjections; +import io.trino.plugin.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; +import io.trino.spi.predicate.SortedRangeSet; +import io.trino.spi.predicate.TupleDomain; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static io.trino.plugin.hive.HiveTestUtils.SESSION; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.ARTICLE_COLUMN; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.AUTHOR_COLUMN; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.DATE_ARTICLE_COLUMN; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.QUANTITY_COLUMN; +import static io.trino.spi.predicate.TupleDomain.withColumnDomains; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; +import static org.testng.Assert.assertTrue; + +public class TestS3SelectRecordCursorProvider +{ + @Test + public void shouldReturnSelectRecordCursor() + { + List readerColumns = new ArrayList<>(); + TupleDomain effectivePredicate = TupleDomain.all(); + Optional recordCursor = + getRecordCursor(effectivePredicate, readerColumns, true); + assertTrue(recordCursor.isPresent()); + } + + @Test + public void shouldReturnSelectRecordCursorWhenEffectivePredicateExists() + { + TupleDomain effectivePredicate = withColumnDomains(ImmutableMap.of(QUANTITY_COLUMN, + Domain.create(SortedRangeSet.copyOf(BIGINT, ImmutableList.of(Range.equal(BIGINT, 3L))), false))); + Optional recordCursor = + getRecordCursor(effectivePredicate, getAllColumns(), true); + assertTrue(recordCursor.isPresent()); + } + + @Test + public void shouldReturnSelectRecordCursorWhenProjectionExists() + { + TupleDomain effectivePredicate = TupleDomain.all(); + List readerColumns = ImmutableList.of(QUANTITY_COLUMN, AUTHOR_COLUMN, ARTICLE_COLUMN); + Optional recordCursor = + getRecordCursor(effectivePredicate, readerColumns, true); + assertTrue(recordCursor.isPresent()); + } + + @Test + public void shouldNotReturnSelectRecordCursorWhenPushdownIsDisabled() + { + List readerColumns = new ArrayList<>(); + TupleDomain effectivePredicate = TupleDomain.all(); + Optional recordCursor = + getRecordCursor(effectivePredicate, readerColumns, false); + assertTrue(recordCursor.isEmpty()); + } + + @Test + public void shouldNotReturnSelectRecordCursorWhenQueryIsNotFiltering() + { + TupleDomain effectivePredicate = TupleDomain.all(); + Optional recordCursor = + getRecordCursor(effectivePredicate, getAllColumns(), true); + assertTrue(recordCursor.isEmpty()); + } + + @Test + public void shouldNotReturnSelectRecordCursorWhenProjectionOrderIsDifferent() + { + TupleDomain effectivePredicate = TupleDomain.all(); + List readerColumns = ImmutableList.of(DATE_ARTICLE_COLUMN, QUANTITY_COLUMN, ARTICLE_COLUMN, AUTHOR_COLUMN); + Optional recordCursor = + getRecordCursor(effectivePredicate, readerColumns, true); + assertTrue(recordCursor.isEmpty()); + } + + private static Optional getRecordCursor(TupleDomain effectivePredicate, + List readerColumns, + boolean s3SelectPushdownEnabled) + { + S3SelectRecordCursorProvider s3SelectRecordCursorProvider = new S3SelectRecordCursorProvider( + new TestingHdfsEnvironment(new ArrayList<>()), + new TrinoS3ClientFactory(new HiveConfig())); + + return s3SelectRecordCursorProvider.createRecordCursor( + ConfigurationInstantiator.newEmptyConfiguration(), + SESSION, + new Path("s3://fakeBucket/fakeObject.gz"), + 0, + 10, + 10, + createTestingSchema(), + readerColumns, + effectivePredicate, + TESTING_TYPE_MANAGER, + s3SelectPushdownEnabled); + } + + private static Properties createTestingSchema() + { + List schemaColumns = getAllColumns(); + Properties schema = new Properties(); + String columnNames = buildPropertyFromColumns(schemaColumns, HiveColumnHandle::getName); + String columnTypeNames = buildPropertyFromColumns(schemaColumns, column -> column.getHiveType().getTypeInfo().getTypeName()); + schema.setProperty(LIST_COLUMNS, columnNames); + schema.setProperty(LIST_COLUMN_TYPES, columnTypeNames); + String deserializerClassName = LazySimpleSerDe.class.getName(); + schema.setProperty(SERIALIZATION_LIB, deserializerClassName); + return schema; + } + + private static String buildPropertyFromColumns(List columns, Function mapper) + { + if (columns.isEmpty()) { + return ""; + } + return columns.stream() + .map(mapper) + .collect(Collectors.joining(",")); + } + + private static List getAllColumns() + { + return ImmutableList.of(ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN); + } +}