diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java index 1349a1dcd301c..fe08425d3ffd6 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java @@ -31,6 +31,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import static com.facebook.presto.util.PropertiesUtil.loadProperties; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; import static java.lang.String.format; @@ -65,29 +66,38 @@ public void addPlanCheckerProviderFactory(PlanCheckerProviderFactory planChecker public void loadPlanCheckerProviders(NodeManager nodeManager) throws IOException { - PlanCheckerProviderContext planCheckerProviderContext = new PlanCheckerProviderContext(simplePlanFragmentSerde, nodeManager); - for (File file : listFiles(configDirectory)) { if (file.isFile() && file.getName().endsWith(".properties")) { // unlike function namespaces and connectors, we don't have a concept of catalog // name here (conventionally config file name without the extension) // because plan checkers are never referenced by name. Map properties = new HashMap<>(loadProperties(file)); - checkState(!isNullOrEmpty(properties.get(PLAN_CHECKER_PROVIDER_NAME)), + String planCheckerProviderName = properties.remove(PLAN_CHECKER_PROVIDER_NAME); + checkState(!isNullOrEmpty(planCheckerProviderName), "Plan checker configuration %s does not contain %s", file.getAbsoluteFile(), PLAN_CHECKER_PROVIDER_NAME); - String planCheckerProviderName = properties.remove(PLAN_CHECKER_PROVIDER_NAME); - log.info("-- Loading plan checker provider [%s] --", planCheckerProviderName); - PlanCheckerProviderFactory providerFactory = providerFactories.get(planCheckerProviderName); - checkState(providerFactory != null, - "No planCheckerProviderFactory found for '%s'. Available factories were %s", planCheckerProviderName, providerFactories.keySet()); - providers.addIfAbsent(providerFactory.create(properties, planCheckerProviderContext)); - log.info("-- Added plan checker provider [%s] --", planCheckerProviderName); + loadPlanCheckerProvider(planCheckerProviderName, properties, nodeManager); } } } + public void loadPlanCheckerProvider(String planCheckerProviderName, Map properties, NodeManager nodeManager) + { + checkArgument(!isNullOrEmpty(planCheckerProviderName), "Plan checker provider name is null or empty"); + requireNonNull(properties, "properties is null"); + requireNonNull(nodeManager, "nodeManager is null"); + + PlanCheckerProviderContext planCheckerProviderContext = new PlanCheckerProviderContext(simplePlanFragmentSerde, nodeManager); + + log.info("-- Loading plan checker provider [%s] --", planCheckerProviderName); + PlanCheckerProviderFactory providerFactory = providerFactories.get(planCheckerProviderName); + checkState(providerFactory != null, + "No planCheckerProviderFactory found for '%s'. Available factories were %s", planCheckerProviderName, providerFactories.keySet()); + providers.addIfAbsent(providerFactory.create(properties, planCheckerProviderContext)); + log.info("-- Added plan checker provider [%s] --", planCheckerProviderName); + } + private static List listFiles(File dir) { if (dir != null && dir.isDirectory()) { diff --git a/presto-main-base/src/main/java/com/facebook/presto/testing/QueryRunner.java b/presto-main-base/src/main/java/com/facebook/presto/testing/QueryRunner.java index 7132cbea4bc8f..aafd271f6e603 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/testing/QueryRunner.java +++ b/presto-main-base/src/main/java/com/facebook/presto/testing/QueryRunner.java @@ -121,6 +121,11 @@ default void loadTypeManager(String typeManagerName) throw new UnsupportedOperationException(); } + default void loadPlanCheckerProviderManager(String planCheckerProviderName, Map properties) + { + throw new UnsupportedOperationException(); + } + class MaterializedResultWithPlan { private final MaterializedResult materializedResult; diff --git a/presto-native-execution/presto_cpp/main/types/VeloxPlanConversion.cpp b/presto-native-execution/presto_cpp/main/types/VeloxPlanConversion.cpp index 8baa2e25c523c..116112c67af21 100644 --- a/presto-native-execution/presto_cpp/main/types/VeloxPlanConversion.cpp +++ b/presto-native-execution/presto_cpp/main/types/VeloxPlanConversion.cpp @@ -50,19 +50,9 @@ protocol::PlanConversionResponse prestoToVeloxPlanConversion( auto tableWriteInfo = std::make_shared(); // Attempt to convert the plan fragment to a Velox plan. - if (auto writeNode = - std::dynamic_pointer_cast( - planFragment.root)) { - // TableWriteInfo is not yet built at the planning stage, so we can not - // fully convert a TableWriteNode and skip that node of the fragment. - auto writeSourceNode = - converter.toVeloxQueryPlan(writeNode->source, tableWriteInfo, taskId); - planValidator->validatePlanFragment(core::PlanFragment(writeSourceNode)); - } else { - auto veloxPlan = - converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId); - planValidator->validatePlanFragment(veloxPlan); - } + auto veloxPlan = + converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId); + planValidator->validatePlanFragment(veloxPlan); } catch (const VeloxException& e) { response.failures.emplace_back( copyFailureInfo(VeloxToPrestoExceptionTranslator::translate(e))); diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java index 16895108ae6b8..097ea92b8a5e4 100644 --- a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java @@ -21,11 +21,17 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.Assignments; import com.facebook.presto.spi.plan.PlanChecker; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanVisitor; +import com.facebook.presto.spi.plan.ProjectNode; import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.plan.TableWriterNode; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.VariableReferenceExpression; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -33,11 +39,18 @@ import okhttp3.Response; import java.io.IOException; +import java.util.List; +import java.util.Map; +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.sidecar.nativechecker.NativePlanCheckerErrorCode.NATIVEPLANCHECKER_CONNECTION_ERROR; import static com.facebook.presto.sidecar.nativechecker.NativePlanCheckerErrorCode.NATIVEPLANCHECKER_UNKNOWN_CONVERSION_FAILURE; import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.Slices.utf8Slice; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toMap; /** * Uses the native sidecar to check verify a plan can be run on a native worker. @@ -70,12 +83,34 @@ public void validate(PlanNode planNode, WarningCollector warningCollector, Conne @Override public void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector, ConnectorSession session) { - if (!planFragment.getPartitioning().isCoordinatorOnly() && !isInternalSystemConnector(planFragment.getRoot())) { - runValidation(planFragment); - } - else { + if (planFragment.getPartitioning().isCoordinatorOnly() + || isInternalSystemConnector(planFragment.getRoot())) { LOG.debug("Skipping native plan validation [fragment: %s, root: %s]", planFragment.getId(), planFragment.getRoot().getId()); + return; } + runValidation(removeTableWriter(planFragment)); + } + + /** + * HACK: Replace TableWriterNode from the plan fragment with a ProjectNode because validating a TableWriterNode + * is unsupported by the native sidecar. They are unsupported because they contain information only determined + * during scheduling. + */ + private SimplePlanFragment removeTableWriter(SimplePlanFragment planFragment) + { + // Remove TableWriterNode from the plan fragment + PlanNode root = planFragment.getRoot().accept(new TableWriterNodeReplacer(), null); + requireNonNull(root, "TableWriterNode removal resulted in null root"); + + return new SimplePlanFragment( + planFragment.getId(), + root, + planFragment.getVariables(), + planFragment.getPartitioning(), + planFragment.getTableScanSchedulingOrder(), + planFragment.getPartitioningScheme(), + planFragment.getStageExecutionDescriptor(), + planFragment.isOutputTableWriterFragment()); } private boolean isInternalSystemConnector(PlanNode planNode) @@ -152,4 +187,40 @@ public Boolean visitPlan(PlanNode node, Void context) return false; } } + + private static class TableWriterNodeReplacer + extends PlanVisitor + { + @Override + public PlanNode visitTableWriter(TableWriterNode tableWriter, Void context) + { + // Create dummy assignments for the ProjectNode + Map assignmentsMap = tableWriter.getOutputVariables() + .subList(3, tableWriter.getOutputVariables().size()) + .stream() + .collect(toMap(i -> i, i -> i)); + assignmentsMap.put(tableWriter.getRowCountVariable(), new ConstantExpression(0L, BIGINT)); + assignmentsMap.put(tableWriter.getFragmentVariable(), new ConstantExpression(utf8Slice(""), VARCHAR)); + assignmentsMap.put(tableWriter.getTableCommitContextVariable(), new ConstantExpression(utf8Slice(""), VARCHAR)); + Assignments assignments = Assignments.builder().putAll(assignmentsMap).build(); + + // Replace TableWriterNode with a ProjectNode + return new ProjectNode( + tableWriter.getId(), + tableWriter.getSource(), + Assignments.builder().putAll(assignmentsMap).build()); + } + + @Override + public PlanNode visitPlan(PlanNode node, Void context) + { + // Recursively process child nodes + List prunedChildren = node.getSources().stream() + .map(child -> child.accept(this, context)) + .collect(toImmutableList()); + + // Replace the current node's children with the pruned children + return node.replaceChildren(prunedChildren); + } + } } diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java index 62d4c47b84f20..970cdd1001a40 100644 --- a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java @@ -19,7 +19,7 @@ public class NativePlanCheckerConfig { public static final String CONFIG_PREFIX = "native-plan-checker"; - private boolean enabled; + private boolean enabled = true; public boolean isPlanValidationEnabled() { diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/typemanager/NativeTypeManager.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/typemanager/NativeTypeManager.java index 51c46370c46f3..3e93d67c2bd88 100644 --- a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/typemanager/NativeTypeManager.java +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/typemanager/NativeTypeManager.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.sidecar.typemanager; -import com.facebook.airlift.log.Logger; import com.facebook.presto.common.type.DistinctTypeInfo; import com.facebook.presto.common.type.FunctionType; import com.facebook.presto.common.type.ParametricType; @@ -74,8 +73,6 @@ public class NativeTypeManager implements TypeManager { - private static final Logger log = Logger.get(NativeTypeManager.class); - private static final Set NATIVE_ENGINE_SUPPORTED_TYPES = ImmutableSet.of( BIGINT, diff --git a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/NativeSidecarPluginQueryRunnerUtils.java b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/NativeSidecarPluginQueryRunnerUtils.java index 28a82820d394a..ad396c288e64c 100644 --- a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/NativeSidecarPluginQueryRunnerUtils.java +++ b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/NativeSidecarPluginQueryRunnerUtils.java @@ -34,5 +34,6 @@ public static void setupNativeSidecarPlugin(QueryRunner queryRunner) "supported-function-languages", "CPP", "function-implementation-type", "CPP")); queryRunner.loadTypeManager(NativeTypeManagerFactory.NAME); + queryRunner.loadPlanCheckerProviderManager("native", ImmutableMap.of()); } } diff --git a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestNativeSidecarPlanChecker.java b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestNativeSidecarPlanChecker.java new file mode 100644 index 0000000000000..a4433846995db --- /dev/null +++ b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestNativeSidecarPlanChecker.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar; + +import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.UUID; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNation; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrdersEx; +import static com.facebook.presto.sidecar.NativeSidecarPluginQueryRunnerUtils.setupNativeSidecarPlugin; +import static java.lang.String.format; + +@Test(singleThreaded = true) +public class TestNativeSidecarPlanChecker + extends AbstractTestQueryFramework +{ + @Override + protected void createTables() + { + QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner(); + createLineitem(queryRunner); + createNation(queryRunner); + createOrders(queryRunner); + createOrdersEx(queryRunner); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.nativeHiveQueryRunnerBuilder() + .setCoordinatorSidecarEnabled(true) + .setExtraProperties(ImmutableMap.of("http-server.http.port", "8089")) + .build(); + setupNativeSidecarPlugin(queryRunner); + queryRunner.getCoordinator().createCatalog("hive2", "hive"); + return queryRunner; + } + + @Override + protected QueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder().build(); + } + + @Test + public void createAndInsertUnbucketedTable() + { + String tableName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", ""); + try { + assertUpdate(format("CREATE TABLE %s AS SELECT orderkey key1, comment value1 FROM orders", tableName), 15000); + assertUpdate(format("INSERT INTO %s SELECT orderkey key1, comment value1 FROM orders", tableName), 15000); + } + finally { + // Clean up the temporary tables + getExpectedQueryRunner().execute(getSession(), format("DROP TABLE IF EXISTS %s", tableName), ImmutableList.of(BIGINT)); + } + } + + @Test + public void selectFromUnbucketedTable() + { + assertQuery(format("SELECT orderkey key1, comment value1 FROM orders")); + } +} diff --git a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java index cdf368c7ddf7a..04632cfe4a060 100644 --- a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java +++ b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java @@ -51,7 +51,6 @@ import java.util.Set; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; @@ -64,14 +63,10 @@ public class TestPlanCheckerProvider public void testGetPlanChecker() { NativePlanCheckerConfig config = new NativePlanCheckerConfig(); - assertFalse(config.isPlanValidationEnabled()); + assertTrue(config.isPlanValidationEnabled()); NativePlanCheckerProvider provider = new NativePlanCheckerProvider(new TestingNodeManager(URI.create("localhost")), PLAN_FRAGMENT_JSON_CODEC, config); assertTrue(provider.getIntermediatePlanCheckers().isEmpty()); assertTrue(provider.getFinalPlanCheckers().isEmpty()); - assertTrue(provider.getFragmentPlanCheckers().isEmpty()); - - // Enable the native plan checker, should appear in fragment plan checkers - config.setPlanValidationEnabled(true); assertEquals(provider.getFragmentPlanCheckers().size(), 1); } diff --git a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/nativechecker/TestNativePlanCheckerConfig.java b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/nativechecker/TestNativePlanCheckerConfig.java new file mode 100644 index 0000000000000..b7086d8f7176c --- /dev/null +++ b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/nativechecker/TestNativePlanCheckerConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestNativePlanCheckerConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(NativePlanCheckerConfig.class) + .setPlanValidationEnabled(true)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("plan-validation-enabled", "false") + .build(); + + NativePlanCheckerConfig expected = new NativePlanCheckerConfig() + .setPlanValidationEnabled(false); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java index 8ac3d460535e3..d2cddf71b8fba 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java @@ -1032,6 +1032,14 @@ public void loadTypeManager(String typeManagerName) } } + @Override + public void loadPlanCheckerProviderManager(String planCheckerProviderName, Map properties) + { + for (TestingPrestoServer server : servers) { + server.getPlanCheckerProviderManager().loadPlanCheckerProvider(planCheckerProviderName, properties, server.getPluginNodeManager()); + } + } + private static void closeUnchecked(AutoCloseable closeable) { try {