Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,13 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-delta</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.teradata</groupId>
<artifactId>re2j-td</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions presto-delta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>node</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,16 @@
*/
package com.facebook.presto.delta;

import com.facebook.presto.Session;
import com.facebook.presto.hive.HivePlugin;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import org.testng.ITest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.DataProvider;

import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.Map;

import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Locale.US;

public abstract class AbstractDeltaDistributedQueryTestBase
extends AbstractTestQueryFramework implements ITest
Expand Down Expand Up @@ -68,7 +59,7 @@ public abstract class AbstractDeltaDistributedQueryTestBase
/**
* List of tables present in the test resources directory. Each table is replicated in reader version 1 and 3
*/
private static final String[] DELTA_TEST_TABLE_LIST =
public static final String[] DELTA_TEST_TABLE_LIST =
new String[DELTA_VERSIONS.length * DELTA_TEST_TABLE_NAMES_LIST.length];
static {
for (int i = 0; i < DELTA_VERSIONS.length; i++) {
Expand Down Expand Up @@ -102,9 +93,9 @@ protected static String getVersionPrefix(String version)
protected QueryRunner createQueryRunner()
throws Exception
{
QueryRunner queryRunner = createDeltaQueryRunner(ImmutableMap.of(
QueryRunner queryRunner = DeltaQueryRunner.builder().setExtraProperties(ImmutableMap.of(
"experimental.pushdown-subfields-enabled", "true",
"experimental.pushdown-dereference-enabled", "true"));
"experimental.pushdown-dereference-enabled", "true")).build().getQueryRunner();

// Create the test Delta tables in HMS
for (String deltaTestTable : DELTA_TEST_TABLE_LIST) {
Expand Down Expand Up @@ -136,51 +127,6 @@ protected static String goldenTablePathWithPrefix(String prefix, String tableNam
return goldenTablePath(prefix + FileSystems.getDefault().getSeparator() + tableName);
}

private static DistributedQueryRunner createDeltaQueryRunner(Map<String, String> extraProperties)
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG)
.setSchema(DELTA_SCHEMA.toLowerCase(US))
.setTimeZoneKey(UTC_KEY)
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
.build();

// Install the TPCH plugin for test data (not in Delta format)
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata");
Path catalogDirectory = dataDirectory.getParent().resolve("catalog");

// Install a Delta connector catalog
queryRunner.installPlugin(new DeltaPlugin());
Map<String, String> deltaProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("delta.case-sensitive-partitions-enabled", "false")
.build();
queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties);

// Install a Hive connector catalog that uses the same metastore as Delta
// This catalog will be used to create tables in metastore as the Delta connector doesn't
// support creating tables yet.
queryRunner.installPlugin(new HivePlugin("hive"));
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("hive.allow-drop-table", "true")
.put("hive.security", "legacy")
.build();
queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties);
queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA));

return queryRunner;
}

/**
* Register the given <i>deltaTableName</i> as <i>hiveTableName</i> in HMS using the Delta catalog.
* Hive and Delta catalogs share the same HMS in this test.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.delta;

import com.facebook.airlift.log.Logging;
import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.hive.HivePlugin;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;

import java.net.URI;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.BiFunction;

import static com.facebook.airlift.log.Level.ERROR;
import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Locale.US;
import static java.util.Objects.requireNonNull;

public class DeltaQueryRunner
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps let's add a setupLogging method as IcebergQueryRunner does, so we don't spam the logs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

{
public static final String DELTA_CATALOG = "delta";
public static final String HIVE_CATALOG = "hive";
public static final String DELTA_SCHEMA = "deltaTables"; // Schema in Hive which has test Delta tables

private DistributedQueryRunner queryRunner;

private DeltaQueryRunner(DistributedQueryRunner queryRunner)
{
this.queryRunner = requireNonNull(queryRunner, "queryRunner is null");
}

public DistributedQueryRunner getQueryRunner()
{
return queryRunner;
}

public static Builder builder()
{
return new Builder();
}

public static class Builder
{
private Builder() {}

private Map<String, String> extraProperties = new HashMap<>();
// If externalWorkerLauncher is not provided, Java workers are used by default.
private Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher = Optional.empty();
private TimeZoneKey timeZoneKey = UTC_KEY;
private boolean caseSensitivePartitions;
private OptionalInt nodeCount = OptionalInt.of(4);

public Builder setExternalWorkerLauncher(Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
{
this.externalWorkerLauncher = requireNonNull(externalWorkerLauncher);
return this;
}

public Builder setExtraProperties(Map<String, String> extraProperties)
{
this.extraProperties = ImmutableMap.copyOf(extraProperties);
return this;
}

public Builder setTimeZoneKey(TimeZoneKey timeZoneKey)
{
this.timeZoneKey = timeZoneKey;
return this;
}

public Builder caseSensitivePartitions()
{
caseSensitivePartitions = true;
return this;
}

public Builder setNodeCount(OptionalInt nodeCount)
{
this.nodeCount = nodeCount;
return this;
}

public DeltaQueryRunner build()
throws Exception
{
setupLogging();
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG)
.setSchema(DELTA_SCHEMA.toLowerCase(US))
.setTimeZoneKey(timeZoneKey)
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
.setNodeCount(nodeCount.orElse(4))
.setExternalWorkerLauncher(externalWorkerLauncher)
Comment thread
aditi-pandit marked this conversation as resolved.
.build();

// Install the TPCH plugin for test data (not in Delta format)
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata");
Path catalogDirectory = dataDirectory.getParent().resolve("catalog");

// Install a Delta connector catalog
queryRunner.installPlugin(new DeltaPlugin());
Map<String, String> deltaProperties = new HashMap<>();
deltaProperties.put("hive.metastore", "file");
deltaProperties.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString());
deltaProperties.put("delta.case-sensitive-partitions-enabled", Boolean.toString(caseSensitivePartitions));
queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties);

// Install a Hive connector catalog that uses the same metastore as Delta
// This catalog will be used to create tables in metastore as the Delta connector doesn't
// support creating tables yet.
queryRunner.installPlugin(new HivePlugin("hive"));
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("hive.allow-drop-table", "true")
.put("hive.security", "legacy")
.build();
queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties);
queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA));

return new DeltaQueryRunner(queryRunner);
}
}

private static void setupLogging()
{
Logging logging = Logging.initialize();
logging.setLevel("com.facebook.presto.event", WARN);
logging.setLevel("com.facebook.presto.security.AccessControlManager", WARN);
logging.setLevel("com.facebook.presto.server.PluginManager", WARN);
logging.setLevel("com.facebook.airlift.bootstrap.LifeCycleManager", WARN);
logging.setLevel("org.apache.parquet.hadoop", WARN);
logging.setLevel("org.eclipse.jetty.server.handler.ContextHandler", WARN);
logging.setLevel("org.eclipse.jetty.server.AbstractConnector", WARN);
logging.setLevel("org.glassfish.jersey.internal.inject.Providers", ERROR);
logging.setLevel("parquet.hadoop", WARN);
logging.setLevel("org.apache.iceberg", WARN);
logging.setLevel("com.facebook.airlift.bootstrap", WARN);
logging.setLevel("Bootstrap", WARN);
logging.setLevel("org.apache.hadoop.io.compress", WARN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,15 @@
*/
package com.facebook.presto.delta;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.hive.HivePlugin;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.nio.file.Path;
import java.util.Map;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Locale.US;
import static org.testng.Assert.assertEquals;

public class TestUppercasePartitionColumns
Expand All @@ -38,54 +31,16 @@ public class TestUppercasePartitionColumns
protected QueryRunner createQueryRunner()
throws Exception
{
return createDeltaQueryRunner(ImmutableMap.of(
Map<String, String> extraProperties = ImmutableMap.of(
"experimental.pushdown-subfields-enabled", "true",
"experimental.pushdown-dereference-enabled", "true"));
}
"experimental.pushdown-dereference-enabled", "true");

private static DistributedQueryRunner createDeltaQueryRunner(Map<String, String> extraProperties)
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG)
.setSchema(DELTA_SCHEMA.toLowerCase(US))
return DeltaQueryRunner.builder()
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey("Europe/Madrid"))
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
.build();

// Install the TPCH plugin for test data (not in Delta format)
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata");
Path catalogDirectory = dataDirectory.getParent().resolve("catalog");

// Install a Delta connector catalog
queryRunner.installPlugin(new DeltaPlugin());
Map<String, String> deltaProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("delta.case-sensitive-partitions-enabled", "true")
.build();
queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties);

// Install a Hive connector catalog that uses the same metastore as Delta
// This catalog will be used to create tables in metastore as the Delta connector doesn't
// support creating tables yet.
queryRunner.installPlugin(new HivePlugin("hive"));
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("hive.allow-drop-table", "true")
.put("hive.security", "legacy")
.build();
queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties);
queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA));

return queryRunner;
.caseSensitivePartitions()
.build()
.getQueryRunner();
}

@Test(dataProvider = "deltaReaderVersions")
Expand Down
Loading
Loading