Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@

import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.serde.serdeConstants.COLUMN_NAME_DELIMITER;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;

public class S3SelectRecordCursorProvider
implements HiveRecordCursorProvider
Expand Down Expand Up @@ -86,6 +89,11 @@ public Optional<RecordCursor> createRecordCursor(
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);
}

// Query is not going to filter any data, no need to use S3 Select.
if (!hasFilters(schema, effectivePredicate, columns)) {
return Optional.empty();
}

String serdeName = getDeserializerClassName(schema);
if (CSV_SERDES.contains(serdeName)) {
IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager);
Expand All @@ -97,4 +105,32 @@ public Optional<RecordCursor> createRecordCursor(
// unsupported serdes
return Optional.empty();
}

private static boolean hasFilters(
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> projectedColumns)
{
// When there are no effective predicates and the projected columns are identical to the schema, it means that
// we get all the data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases.
if (effectivePredicate.isAll()) {
return !areColumnsEquivalent(projectedColumns, schema);
}
return true;
}

private static boolean areColumnsEquivalent(List<HiveColumnHandle> projectedColumns, Properties schema)
{
Set<String> projectedColumnNames = projectedColumns.stream().map(HiveColumnHandle::getName).collect(toImmutableSet());
Set<String> schemaColumnNames;
String columnNameProperty = schema.getProperty(LIST_COLUMNS);
if (columnNameProperty.length() == 0) {
schemaColumnNames = ImmutableSet.of();
}
else {
String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ",");
schemaColumnNames = ImmutableSet.copyOf(columnNameProperty.split(columnNameDelimiter));
}
return projectedColumnNames.equals(schemaColumnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path)
new BlockLocation[] {});
}

private static class TestingHdfsEnvironment
public static class TestingHdfsEnvironment
extends HdfsEnvironment
{
private final List<LocatedFileStatus> files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public class TestS3SelectRecordCursor
{
private static final String LAZY_SERDE_CLASS_NAME = LazySimpleSerDe.class.getName();

private static final HiveColumnHandle ARTICLE_COLUMN = new HiveColumnHandle("article", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 1, REGULAR, Optional.empty(), Optional.empty());
private static final HiveColumnHandle AUTHOR_COLUMN = new HiveColumnHandle("author", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 1, REGULAR, Optional.empty(), Optional.empty());
private static final HiveColumnHandle DATE_ARTICLE_COLUMN = new HiveColumnHandle("date_pub", HIVE_INT, parseTypeSignature(StandardTypes.DATE), 1, REGULAR, Optional.empty(), Optional.empty());
private static final HiveColumnHandle QUANTITY_COLUMN = new HiveColumnHandle("quantity", HIVE_INT, parseTypeSignature(StandardTypes.INTEGER), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle ARTICLE_COLUMN = new HiveColumnHandle("article", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle AUTHOR_COLUMN = new HiveColumnHandle("author", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle DATE_ARTICLE_COLUMN = new HiveColumnHandle("date_pub", HIVE_INT, parseTypeSignature(StandardTypes.DATE), 1, REGULAR, Optional.empty(), Optional.empty());
static final HiveColumnHandle QUANTITY_COLUMN = new HiveColumnHandle("quantity", HIVE_INT, parseTypeSignature(StandardTypes.INTEGER), 1, REGULAR, Optional.empty(), Optional.empty());
private static final HiveColumnHandle[] DEFAULT_TEST_COLUMNS = {ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN};
private static final HiveColumnHandle MOCK_HIVE_COLUMN_HANDLE = new HiveColumnHandle("mockName", HiveType.HIVE_FLOAT, parseTypeSignature(StandardTypes.DOUBLE), 88, PARTITION_KEY, Optional.empty(), Optional.empty());
private static final TypeManager MOCK_TYPE_MANAGER = new TestingTypeManager();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.s3select;

import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.SortedRangeSet;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveFileSplit;
import com.facebook.presto.hive.TestBackgroundHiveSplitLoader;
import com.facebook.presto.hive.s3.PrestoS3ClientFactory;
import com.facebook.presto.spi.RecordCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.joda.time.DateTimeZone;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.s3select.TestS3SelectRecordCursor.ARTICLE_COLUMN;
import static com.facebook.presto.hive.s3select.TestS3SelectRecordCursor.AUTHOR_COLUMN;
import static com.facebook.presto.hive.s3select.TestS3SelectRecordCursor.DATE_ARTICLE_COLUMN;
import static com.facebook.presto.hive.s3select.TestS3SelectRecordCursor.QUANTITY_COLUMN;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestS3SelectRecordCursorProvider
{
@Test
public void shouldReturnSelectRecordCursor()
{
List<HiveColumnHandle> columns = new ArrayList<>();
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
Optional<RecordCursor> recordCursor =
getRecordCursor(effectivePredicate, columns, true);
assertTrue(recordCursor.isPresent());
}

@Test
public void shouldReturnSelectRecordCursorWhenEffectivePredicateExists()
{
TupleDomain<HiveColumnHandle> effectivePredicate = withColumnDomains(ImmutableMap.of(QUANTITY_COLUMN,
Domain.create(SortedRangeSet.copyOf(BIGINT, ImmutableList.of(Range.equal(BIGINT, 3L))), false)));
Optional<RecordCursor> recordCursor =
getRecordCursor(effectivePredicate, getAllColumns(), true);
assertTrue(recordCursor.isPresent());
}

@Test
public void shouldReturnSelectRecordCursorWhenProjectionExists()
{
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
List<HiveColumnHandle> columns = ImmutableList.of(QUANTITY_COLUMN, AUTHOR_COLUMN, ARTICLE_COLUMN);
Optional<RecordCursor> recordCursor =
getRecordCursor(effectivePredicate, columns, true);
assertTrue(recordCursor.isPresent());
}

@Test
public void shouldNotReturnSelectRecordCursorWhenPushdownIsDisabled()
{
List<HiveColumnHandle> columns = new ArrayList<>();
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
Optional<RecordCursor> recordCursor =
getRecordCursor(effectivePredicate, columns, false);
assertFalse(recordCursor.isPresent());
}

@Test
public void shouldNotReturnSelectRecordCursorWhenQueryIsNotFiltering()
{
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
Optional<RecordCursor> recordCursor =
getRecordCursor(effectivePredicate, getAllColumns(), true);
assertFalse(recordCursor.isPresent());
}

@Test
public void shouldNotReturnSelectRecordCursorWhenProjectionOrderIsDifferent()
{
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
List<HiveColumnHandle> columns = ImmutableList.of(DATE_ARTICLE_COLUMN, QUANTITY_COLUMN, ARTICLE_COLUMN, AUTHOR_COLUMN);
Optional<RecordCursor> recordCursor =
getRecordCursor(effectivePredicate, columns, true);
assertFalse(recordCursor.isPresent());
}

private static Optional<RecordCursor> getRecordCursor(TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> columns,
boolean s3SelectPushdownEnabled)
{
S3SelectRecordCursorProvider s3SelectRecordCursorProvider = new S3SelectRecordCursorProvider(
new TestBackgroundHiveSplitLoader.TestingHdfsEnvironment(new ArrayList<>()),
new HiveClientConfig(),
new PrestoS3ClientFactory());
HiveFileSplit fileSplit = new HiveFileSplit(
"s3://fakeBucket/fakeObject.gz",
0,
100,
100,
0,
Optional.empty(),
ImmutableMap.of());
return s3SelectRecordCursorProvider.createRecordCursor(
new Configuration(),
SESSION,
fileSplit,
createTestingSchema(),
columns,
effectivePredicate,
DateTimeZone.forID(SESSION.getSqlFunctionProperties().getTimeZoneKey().getId()),
FUNCTION_AND_TYPE_MANAGER,
s3SelectPushdownEnabled);
}

private static Properties createTestingSchema()
{
List<HiveColumnHandle> schemaColumns = getAllColumns();
Properties schema = new Properties();
String columnNames = buildPropertyFromColumns(schemaColumns, HiveColumnHandle::getName);
String columnTypeNames = buildPropertyFromColumns(schemaColumns, column -> column.getHiveType().getTypeInfo().getTypeName());
schema.setProperty(LIST_COLUMNS, columnNames);
schema.setProperty(LIST_COLUMN_TYPES, columnTypeNames);
String deserializerClassName = LazySimpleSerDe.class.getName();
schema.setProperty(SERIALIZATION_LIB, deserializerClassName);
return schema;
}

private static String buildPropertyFromColumns(List<HiveColumnHandle> columns, Function<HiveColumnHandle, String> mapper)
{
if (columns.isEmpty()) {
return "";
}
return columns.stream()
.map(mapper)
.collect(Collectors.joining(","));
}

private static List<HiveColumnHandle> getAllColumns()
{
return ImmutableList.of(ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN);
}
}