Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions presto-hive-hadoop2/bin/run_hive_s3_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ set -euo pipefail -x
cleanup_docker_containers
start_docker_containers

# obtain Hive version
TESTS_HIVE_VERSION_MAJOR=$(get_hive_major_version)

# insert AWS credentials
exec_in_hadoop_master_container cp /etc/hadoop/conf/core-site.xml.s3-template /etc/hadoop/conf/core-site.xml
exec_in_hadoop_master_container sed -i \
Expand All @@ -18,10 +21,8 @@ exec_in_hadoop_master_container sed -i \
# create test table
table_path="s3a://${S3_BUCKET}/presto_test_external_fs/"
exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.gz "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.lz4 "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.bz2 "${table_path}"
exec_in_hadoop_master_container hadoop fs -put -f /tmp/files/test1.csv{,.gz,.bz2,.lz4} "${table_path}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat shortcut. I guess we don't really need the specific -copyFromLocal semantics enforced here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


exec_in_hadoop_master_container /usr/bin/hive -e "CREATE EXTERNAL TABLE presto_test_external_fs(t_bigint bigint) LOCATION '${table_path}'"

stop_unnecessary_hadoop_services
Expand All @@ -40,6 +41,7 @@ set +e
-Dhive.hadoop2.databaseName=default \
-Dhive.hadoop2.s3.awsAccessKey=${AWS_ACCESS_KEY_ID} \
-Dhive.hadoop2.s3.awsSecretKey=${AWS_SECRET_ACCESS_KEY} \
-Dhive.hadoop2.hiveVersionMajor="${TESTS_HIVE_VERSION_MAJOR}" \
-Dhive.hadoop2.s3.writableBucket=${S3_BUCKET}
EXIT_CODE=$?
set -e
Expand Down
5 changes: 1 addition & 4 deletions presto-hive-hadoop2/conf/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,4 @@ services:
- ../../presto-hive/src/test/sql:/files/sql:ro
- ./files/words:/usr/share/dict/words:ro
- ./files/core-site.xml.s3-template:/etc/hadoop/conf/core-site.xml.s3-template:ro
- ./files/test1.csv:/tmp/test1.csv:ro
- ./files/test1.csv.gz:/tmp/test1.csv.gz:ro
- ./files/test1.csv.lz4:/tmp/test1.csv.lz4:ro
- ./files/test1.csv.bz2:/tmp/test1.csv.bz2:ro
- ./files:/tmp/files:ro
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.TestingConnectorSession;
import com.facebook.presto.testing.TestingNodeManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
Expand All @@ -79,7 +77,6 @@
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -94,6 +91,7 @@
import static com.facebook.presto.hive.AbstractTestHiveClient.filterNonHiddenColumnHandles;
import static com.facebook.presto.hive.AbstractTestHiveClient.filterNonHiddenColumnMetadata;
import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits;
import static com.facebook.presto.hive.HiveFileSystemTestUtils.getTableHandle;
import static com.facebook.presto.hive.HiveQueryRunner.METASTORE_CONTEXT;
import static com.facebook.presto.hive.HiveTestUtils.FILTER_STATS_CALCULATOR_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
Expand All @@ -111,7 +109,6 @@
import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE;
import static com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
Expand Down Expand Up @@ -265,40 +262,24 @@ protected Transaction newTransaction()
return new HiveTransaction(transactionManager, metadataFactory.get());
}

protected MaterializedResult readTable(SchemaTableName tableName)
throws IOException
{
return HiveFileSystemTestUtils.readTable(tableName, transactionManager, config, metadataFactory, pageSourceProvider, splitManager);
}

@Test
public void testGetRecords()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();

ConnectorTableHandle table = getTableHandle(metadata, this.table);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);

List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
assertEquals(layoutHandle.getPartitions().get().size(), 1);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT);

TableHandle tableHandle = new TableHandle(new ConnectorId(database), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

long sum = 0;

for (ConnectorSplit split : getAllSplits(splitSource)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE)) {
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));

for (MaterializedRow row : result) {
sum += (Long) row.getField(columnIndex.get("t_bigint"));
}
}
}
// The test table is made up of multiple S3 objects with same data and different compression codec
// formats: uncompressed | .gz | .lz4 | .bz2
assertEquals(sum, 78300 * 4);
}
assertEqualsIgnoreOrder(
readTable(table),
MaterializedResult.resultBuilder(newSession(), BIGINT)
.row(70000L).row(8000L).row(300L) // test1.csv
.row(70000L).row(8000L).row(300L) // test1.csv.gz
.row(70000L).row(8000L).row(300L) // test1.csv.bz2
.row(70000L).row(8000L).row(300L) // test1.csv.lz4
.build());
}

@Test
Expand Down Expand Up @@ -447,11 +428,11 @@ private void createTable(MetastoreContext metastoreContext, SchemaTableName tabl
ConnectorSession session = newSession();

// load the new table
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName);
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName, session);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, hiveTableHandle).values());

// verify the metadata
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName));
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName, session));
assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), columns);

// verify the data
Expand All @@ -478,25 +459,6 @@ private void dropTable(SchemaTableName table)
}
}

private ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName)
{
ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName);
checkArgument(handle != null, "table not found: %s", tableName);
return handle;
}

private static ImmutableMap<String, Integer> indexColumns(List<ColumnHandle> columnHandles)
{
ImmutableMap.Builder<String, Integer> index = ImmutableMap.builder();
int i = 0;
for (ColumnHandle columnHandle : columnHandles) {
HiveColumnHandle hiveColumnHandle = (HiveColumnHandle) columnHandle;
index.put(hiveColumnHandle.getName(), i);
i++;
}
return index.build();
}

private static class TestingHiveMetastore
extends CachingHiveMetastore
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction;
import com.facebook.presto.hive.AbstractTestHiveClient.Transaction;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits;
import static com.facebook.presto.hive.AbstractTestHiveFileSystem.SPLIT_SCHEDULING_CONTEXT;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;

public class HiveFileSystemTestUtils
{
private HiveFileSystemTestUtils() {}

public static MaterializedResult readTable(SchemaTableName tableName,
HiveTransactionManager transactionManager,
HiveClientConfig config,
HiveMetadataFactory metadataFactory,
ConnectorPageSourceProvider pageSourceProvider,
ConnectorSplitManager splitManager)
throws IOException
{
ConnectorMetadata metadata = null;
ConnectorSession session = null;
ConnectorSplitSource splitSource = null;

try (Transaction transaction = newTransaction(transactionManager, metadataFactory.get())) {
metadata = transaction.getMetadata();
session = newSession(config);

ConnectorTableHandle table = getTableHandle(metadata, tableName, session);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values());
List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
TableHandle tableHandle = new TableHandle(new ConnectorId(tableName.getSchemaName()), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

metadata.beginQuery(session);

splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle.getLayout().get(), SPLIT_SCHEDULING_CONTEXT);

List<Type> allTypes = getTypes(columnHandles);
List<Type> dataTypes = getTypes(columnHandles.stream()
.filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
.collect(toImmutableList()));
MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes);

List<ConnectorSplit> splits = getAllSplits(splitSource);
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(
transaction.getTransactionHandle(),
session,
split,
tableHandle.getLayout().get(),
columnHandles,
new SplitContext(false, TupleDomain.none()))) {
MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
Object[] dataValues = IntStream.range(0, row.getFieldCount())
.filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden())
.mapToObj(row::getField)
.toArray();
result.row(dataValues);
}
}
}
return result.build();
}
finally {
cleanUpQuery(metadata, session);
closeQuietly(splitSource);
}
}

public static Transaction newTransaction(HiveTransactionManager transactionManager, HiveMetadata hiveMetadata)
{
return new HiveTransaction(transactionManager, hiveMetadata);
}

public static ConnectorSession newSession(HiveClientConfig config)
{
return new TestingConnectorSession(new HiveSessionProperties(
config,
new OrcFileWriterConfig(),
new ParquetFileWriterConfig(),
new CacheConfig()).getSessionProperties());
}

public static ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName, ConnectorSession session)
{
ConnectorTableHandle handle = metadata.getTableHandle(session, tableName);
checkArgument(handle != null, "table not found: %s", tableName);
return handle;
}

private static void closeQuietly(Closeable closeable)
{
try {
if (closeable != null) {
closeable.close();
}
}
catch (IOException ignored) {
}
}

private static void cleanUpQuery(ConnectorMetadata metadata, ConnectorSession session)
{
if (metadata != null && session != null) {
metadata.cleanupQuery(session);
}
}
}
Loading