diff --git a/pom.xml b/pom.xml index c023788a21b60..0e0970a24233b 100644 --- a/pom.xml +++ b/pom.xml @@ -500,6 +500,13 @@ test-jar + + com.facebook.presto + presto-delta + ${project.version} + test-jar + + com.teradata re2j-td diff --git a/presto-delta/pom.xml b/presto-delta/pom.xml index b63adb6b94728..fc843ab25ba2e 100644 --- a/presto-delta/pom.xml +++ b/presto-delta/pom.xml @@ -321,6 +321,12 @@ test + + com.facebook.airlift + log-manager + runtime + + com.facebook.airlift node diff --git a/presto-delta/src/test/java/com/facebook/presto/delta/AbstractDeltaDistributedQueryTestBase.java b/presto-delta/src/test/java/com/facebook/presto/delta/AbstractDeltaDistributedQueryTestBase.java index cf44b28ac8698..11e8126af8632 100644 --- a/presto-delta/src/test/java/com/facebook/presto/delta/AbstractDeltaDistributedQueryTestBase.java +++ b/presto-delta/src/test/java/com/facebook/presto/delta/AbstractDeltaDistributedQueryTestBase.java @@ -13,25 +13,16 @@ */ package com.facebook.presto.delta; -import com.facebook.presto.Session; -import com.facebook.presto.hive.HivePlugin; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; -import com.facebook.presto.tests.DistributedQueryRunner; -import com.facebook.presto.tpch.TpchPlugin; import com.google.common.collect.ImmutableMap; import org.testng.ITest; import org.testng.annotations.AfterClass; import org.testng.annotations.DataProvider; import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.util.Map; -import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY; -import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; -import static java.util.Locale.US; public abstract class AbstractDeltaDistributedQueryTestBase extends AbstractTestQueryFramework implements ITest @@ -68,7 +59,7 @@ public abstract class AbstractDeltaDistributedQueryTestBase /** * List of tables present in the test resources directory. Each table is replicated in reader version 1 and 3 */ - private static final String[] DELTA_TEST_TABLE_LIST = + public static final String[] DELTA_TEST_TABLE_LIST = new String[DELTA_VERSIONS.length * DELTA_TEST_TABLE_NAMES_LIST.length]; static { for (int i = 0; i < DELTA_VERSIONS.length; i++) { @@ -102,9 +93,9 @@ protected static String getVersionPrefix(String version) protected QueryRunner createQueryRunner() throws Exception { - QueryRunner queryRunner = createDeltaQueryRunner(ImmutableMap.of( + QueryRunner queryRunner = DeltaQueryRunner.builder().setExtraProperties(ImmutableMap.of( "experimental.pushdown-subfields-enabled", "true", - "experimental.pushdown-dereference-enabled", "true")); + "experimental.pushdown-dereference-enabled", "true")).build().getQueryRunner(); // Create the test Delta tables in HMS for (String deltaTestTable : DELTA_TEST_TABLE_LIST) { @@ -136,51 +127,6 @@ protected static String goldenTablePathWithPrefix(String prefix, String tableNam return goldenTablePath(prefix + FileSystems.getDefault().getSeparator() + tableName); } - private static DistributedQueryRunner createDeltaQueryRunner(Map extraProperties) - throws Exception - { - Session session = testSessionBuilder() - .setCatalog(DELTA_CATALOG) - .setSchema(DELTA_SCHEMA.toLowerCase(US)) - .setTimeZoneKey(UTC_KEY) - .build(); - - DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) - .setExtraProperties(extraProperties) - .build(); - - // Install the TPCH plugin for test data (not in Delta format) - queryRunner.installPlugin(new TpchPlugin()); - queryRunner.createCatalog("tpch", "tpch"); - - Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata"); - Path catalogDirectory = dataDirectory.getParent().resolve("catalog"); - - // Install a Delta connector catalog - queryRunner.installPlugin(new DeltaPlugin()); - Map deltaProperties = ImmutableMap.builder() - .put("hive.metastore", "file") - .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()) - .put("delta.case-sensitive-partitions-enabled", "false") - .build(); - queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties); - - // Install a Hive connector catalog that uses the same metastore as Delta - // This catalog will be used to create tables in metastore as the Delta connector doesn't - // support creating tables yet. - queryRunner.installPlugin(new HivePlugin("hive")); - Map hiveProperties = ImmutableMap.builder() - .put("hive.metastore", "file") - .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()) - .put("hive.allow-drop-table", "true") - .put("hive.security", "legacy") - .build(); - queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties); - queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA)); - - return queryRunner; - } - /** * Register the given deltaTableName as hiveTableName in HMS using the Delta catalog. * Hive and Delta catalogs share the same HMS in this test. diff --git a/presto-delta/src/test/java/com/facebook/presto/delta/DeltaQueryRunner.java b/presto-delta/src/test/java/com/facebook/presto/delta/DeltaQueryRunner.java new file mode 100644 index 0000000000000..7411d9c9b1b73 --- /dev/null +++ b/presto-delta/src/test/java/com/facebook/presto/delta/DeltaQueryRunner.java @@ -0,0 +1,169 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.delta; + +import com.facebook.airlift.log.Logging; +import com.facebook.presto.Session; +import com.facebook.presto.common.type.TimeZoneKey; +import com.facebook.presto.hive.HivePlugin; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.facebook.presto.tpch.TpchPlugin; +import com.google.common.collect.ImmutableMap; + +import java.net.URI; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.function.BiFunction; + +import static com.facebook.airlift.log.Level.ERROR; +import static com.facebook.airlift.log.Level.WARN; +import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; +import static java.util.Locale.US; +import static java.util.Objects.requireNonNull; + +public class DeltaQueryRunner +{ + public static final String DELTA_CATALOG = "delta"; + public static final String HIVE_CATALOG = "hive"; + public static final String DELTA_SCHEMA = "deltaTables"; // Schema in Hive which has test Delta tables + + private DistributedQueryRunner queryRunner; + + private DeltaQueryRunner(DistributedQueryRunner queryRunner) + { + this.queryRunner = requireNonNull(queryRunner, "queryRunner is null"); + } + + public DistributedQueryRunner getQueryRunner() + { + return queryRunner; + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private Builder() {} + + private Map extraProperties = new HashMap<>(); + // If externalWorkerLauncher is not provided, Java workers are used by default. + private Optional> externalWorkerLauncher = Optional.empty(); + private TimeZoneKey timeZoneKey = UTC_KEY; + private boolean caseSensitivePartitions; + private OptionalInt nodeCount = OptionalInt.of(4); + + public Builder setExternalWorkerLauncher(Optional> externalWorkerLauncher) + { + this.externalWorkerLauncher = requireNonNull(externalWorkerLauncher); + return this; + } + + public Builder setExtraProperties(Map extraProperties) + { + this.extraProperties = ImmutableMap.copyOf(extraProperties); + return this; + } + + public Builder setTimeZoneKey(TimeZoneKey timeZoneKey) + { + this.timeZoneKey = timeZoneKey; + return this; + } + + public Builder caseSensitivePartitions() + { + caseSensitivePartitions = true; + return this; + } + + public Builder setNodeCount(OptionalInt nodeCount) + { + this.nodeCount = nodeCount; + return this; + } + + public DeltaQueryRunner build() + throws Exception + { + setupLogging(); + Session session = testSessionBuilder() + .setCatalog(DELTA_CATALOG) + .setSchema(DELTA_SCHEMA.toLowerCase(US)) + .setTimeZoneKey(timeZoneKey) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) + .setExtraProperties(extraProperties) + .setNodeCount(nodeCount.orElse(4)) + .setExternalWorkerLauncher(externalWorkerLauncher) + .build(); + + // Install the TPCH plugin for test data (not in Delta format) + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata"); + Path catalogDirectory = dataDirectory.getParent().resolve("catalog"); + + // Install a Delta connector catalog + queryRunner.installPlugin(new DeltaPlugin()); + Map deltaProperties = new HashMap<>(); + deltaProperties.put("hive.metastore", "file"); + deltaProperties.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()); + deltaProperties.put("delta.case-sensitive-partitions-enabled", Boolean.toString(caseSensitivePartitions)); + queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties); + + // Install a Hive connector catalog that uses the same metastore as Delta + // This catalog will be used to create tables in metastore as the Delta connector doesn't + // support creating tables yet. + queryRunner.installPlugin(new HivePlugin("hive")); + Map hiveProperties = ImmutableMap.builder() + .put("hive.metastore", "file") + .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()) + .put("hive.allow-drop-table", "true") + .put("hive.security", "legacy") + .build(); + queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties); + queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA)); + + return new DeltaQueryRunner(queryRunner); + } + } + + private static void setupLogging() + { + Logging logging = Logging.initialize(); + logging.setLevel("com.facebook.presto.event", WARN); + logging.setLevel("com.facebook.presto.security.AccessControlManager", WARN); + logging.setLevel("com.facebook.presto.server.PluginManager", WARN); + logging.setLevel("com.facebook.airlift.bootstrap.LifeCycleManager", WARN); + logging.setLevel("org.apache.parquet.hadoop", WARN); + logging.setLevel("org.eclipse.jetty.server.handler.ContextHandler", WARN); + logging.setLevel("org.eclipse.jetty.server.AbstractConnector", WARN); + logging.setLevel("org.glassfish.jersey.internal.inject.Providers", ERROR); + logging.setLevel("parquet.hadoop", WARN); + logging.setLevel("org.apache.iceberg", WARN); + logging.setLevel("com.facebook.airlift.bootstrap", WARN); + logging.setLevel("Bootstrap", WARN); + logging.setLevel("org.apache.hadoop.io.compress", WARN); + } +} diff --git a/presto-delta/src/test/java/com/facebook/presto/delta/TestUppercasePartitionColumns.java b/presto-delta/src/test/java/com/facebook/presto/delta/TestUppercasePartitionColumns.java index 97c312d5877f3..6e7f304da7d34 100644 --- a/presto-delta/src/test/java/com/facebook/presto/delta/TestUppercasePartitionColumns.java +++ b/presto-delta/src/test/java/com/facebook/presto/delta/TestUppercasePartitionColumns.java @@ -13,22 +13,15 @@ */ package com.facebook.presto.delta; -import com.facebook.presto.Session; import com.facebook.presto.common.type.TimeZoneKey; -import com.facebook.presto.hive.HivePlugin; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; -import com.facebook.presto.tests.DistributedQueryRunner; -import com.facebook.presto.tpch.TpchPlugin; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; -import java.nio.file.Path; import java.util.Map; -import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; -import static java.util.Locale.US; import static org.testng.Assert.assertEquals; public class TestUppercasePartitionColumns @@ -38,54 +31,16 @@ public class TestUppercasePartitionColumns protected QueryRunner createQueryRunner() throws Exception { - return createDeltaQueryRunner(ImmutableMap.of( + Map extraProperties = ImmutableMap.of( "experimental.pushdown-subfields-enabled", "true", - "experimental.pushdown-dereference-enabled", "true")); - } + "experimental.pushdown-dereference-enabled", "true"); - private static DistributedQueryRunner createDeltaQueryRunner(Map extraProperties) - throws Exception - { - Session session = testSessionBuilder() - .setCatalog(DELTA_CATALOG) - .setSchema(DELTA_SCHEMA.toLowerCase(US)) + return DeltaQueryRunner.builder() .setTimeZoneKey(TimeZoneKey.getTimeZoneKey("Europe/Madrid")) - .build(); - - DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) .setExtraProperties(extraProperties) - .build(); - - // Install the TPCH plugin for test data (not in Delta format) - queryRunner.installPlugin(new TpchPlugin()); - queryRunner.createCatalog("tpch", "tpch"); - - Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata"); - Path catalogDirectory = dataDirectory.getParent().resolve("catalog"); - - // Install a Delta connector catalog - queryRunner.installPlugin(new DeltaPlugin()); - Map deltaProperties = ImmutableMap.builder() - .put("hive.metastore", "file") - .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()) - .put("delta.case-sensitive-partitions-enabled", "true") - .build(); - queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties); - - // Install a Hive connector catalog that uses the same metastore as Delta - // This catalog will be used to create tables in metastore as the Delta connector doesn't - // support creating tables yet. - queryRunner.installPlugin(new HivePlugin("hive")); - Map hiveProperties = ImmutableMap.builder() - .put("hive.metastore", "file") - .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()) - .put("hive.allow-drop-table", "true") - .put("hive.security", "legacy") - .build(); - queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties); - queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA)); - - return queryRunner; + .caseSensitivePartitions() + .build() + .getQueryRunner(); } @Test(dataProvider = "deltaReaderVersions") diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml index d5a9ac99b4425..38211e1442ccf 100644 --- a/presto-native-execution/pom.xml +++ b/presto-native-execution/pom.xml @@ -238,6 +238,45 @@ + + com.facebook.presto + presto-delta + test-jar + test + + + org.scala-lang + scala-library + + + org.apache.commons + commons-lang3 + + + + + + com.facebook.presto + presto-delta + test + + + org.scala-lang + scala-library + + + org.apache.commons + commons-lang3 + + + + + + org.xerial.snappy + snappy-java + test + + com.facebook.presto presto-hive-metastore diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index f0d4b64c8788a..15967ffe1b7d1 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -15,6 +15,8 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.common.ErrorCode; +import com.facebook.presto.common.type.TimeZoneKey; +import com.facebook.presto.delta.DeltaQueryRunner; import com.facebook.presto.functionNamespace.FunctionNamespaceManagerPlugin; import com.facebook.presto.functionNamespace.json.JsonFileBasedFunctionNamespaceManagerFactory; import com.facebook.presto.hive.HiveQueryRunner; @@ -51,6 +53,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.TimeZone; import java.util.UUID; import java.util.function.BiFunction; @@ -85,6 +88,7 @@ public enum QueryRunnerType public static final String HIVE_DATA = "hive_data"; public static final String ICEBERG_DEFAULT_STORAGE_FORMAT = "PARQUET"; + public static final String DELTA_DEFAULT_STORAGE_FORMAT = "PARQUET"; private static final Logger log = Logger.get(PrestoNativeQueryRunnerUtils.class); private static final String DEFAULT_STORAGE_FORMAT = "DWRF"; @@ -401,6 +405,126 @@ public QueryRunner build() } } + public static DeltaQueryRunnerBuilder nativeDeltaQueryRunnerBuilder() + { + return new DeltaQueryRunnerBuilder(QueryRunnerType.NATIVE); + } + + public static DeltaQueryRunnerBuilder javaDeltaQueryRunnerBuilder() + { + return new DeltaQueryRunnerBuilder(QueryRunnerType.JAVA); + } + + /** + * Builder for Delta Lake query runners that supports both Java and Native execution modes. + *

+ * This builder wraps {@link DeltaQueryRunner.Builder} to provide native execution capabilities + * while maintaining separation between the presto-delta and presto-native-execution modules. + *

+ * Why This Separation Exists: + *

+ *

+ * The builder configures native-specific properties (worker launcher, system properties) and + * delegates to {@link DeltaQueryRunner.Builder} for Delta-specific configuration. + * + * @see DeltaQueryRunner.Builder + */ + public static class DeltaQueryRunnerBuilder + { + private NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters(); + private Path dataDirectory = nativeQueryRunnerParameters.dataDirectory; + private String serverBinary = nativeQueryRunnerParameters.serverBinary.toString(); + private Integer workerCount = nativeQueryRunnerParameters.workerCount.orElse(4); + private CatalogType catalogType = Optional + .ofNullable(nativeQueryRunnerParameters.runnerParameters.get("delta.catalog.type")) + .map(v -> CatalogType.valueOf(v.toUpperCase())) + .orElse(CatalogType.HIVE); + private Integer cacheMaxSize = 0; + private Map extraProperties = new HashMap<>(); + private Map extraConnectorProperties = new HashMap<>(); + private Optional remoteFunctionServerUds = Optional.empty(); + private TimeZoneKey timeZoneKey = TimeZoneKey.getTimeZoneKey(TimeZone.getDefault().getID()); + private boolean caseSensitiveParitions; + // External worker launcher is applicable only for the native iceberg query runner, since it depends on other + // properties it should be created once all the other query runner configs are set. This variable indicates + // whether the query runner returned by builder should use an external worker launcher, it will be true only + // for the native query runner and should NOT be explicitly configured by users. + private boolean useExternalWorkerLauncher; + + private DeltaQueryRunnerBuilder(QueryRunnerType queryRunnerType) + { + if (queryRunnerType.equals(QueryRunnerType.NATIVE)) { + this.extraProperties.putAll(ImmutableMap.builder() + .put("http-server.http.port", "8080") + .put("query.max-stage-count", "110") + .putAll(getNativeWorkerSystemProperties()) + .build()); + this.useExternalWorkerLauncher = true; + } + else { + this.extraProperties.putAll(ImmutableMap.of( + "regex-library", "RE2J", + "offset-clause-enabled", "true", + "query.max-stage-count", "110")); + this.extraConnectorProperties.putAll(ImmutableMap.of("hive.parquet.writer.version", "PARQUET_1_0")); + this.useExternalWorkerLauncher = false; + } + } + + public DeltaQueryRunnerBuilder setUseThrift(boolean useThrift) + { + this.extraProperties + .put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift)); + return this; + } + + public DeltaQueryRunnerBuilder addExtraProperties(Map extraProperties) + { + this.extraProperties.putAll(extraProperties); + return this; + } + + public DeltaQueryRunnerBuilder setTimeZoneKey(TimeZoneKey timeZoneKey) + { + this.timeZoneKey = timeZoneKey; + return this; + } + + public DeltaQueryRunnerBuilder caseSensitivePartitions() + { + this.caseSensitiveParitions = true; + return this; + } + + public QueryRunner build() + throws Exception + { + Optional> externalWorkerLauncher = Optional.empty(); + if (this.useExternalWorkerLauncher) { + externalWorkerLauncher = getExternalWorkerLauncher("delta", "delta", serverBinary, cacheMaxSize, remoteFunctionServerUds, + Optional.empty(), false, false, false, false, false, false); + } + DeltaQueryRunner.Builder builder = DeltaQueryRunner.builder() + .setExtraProperties(extraProperties) + .setNodeCount(OptionalInt.of(workerCount)) + .setExternalWorkerLauncher(externalWorkerLauncher) + .setTimeZoneKey(timeZoneKey); + + if (caseSensitiveParitions) { + builder.caseSensitivePartitions(); + } + + return builder.build().getQueryRunner(); + } + } + public static void createSchemaIfNotExist(QueryRunner queryRunner, String schemaName) { ExtendedHiveMetastore metastore = getFileHiveMetastore((DistributedQueryRunner) queryRunner);