Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CopyOnWriteArrayList;

import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
Expand Down Expand Up @@ -65,29 +66,38 @@ public void addPlanCheckerProviderFactory(PlanCheckerProviderFactory planChecker
public void loadPlanCheckerProviders(NodeManager nodeManager)
throws IOException
{
PlanCheckerProviderContext planCheckerProviderContext = new PlanCheckerProviderContext(simplePlanFragmentSerde, nodeManager);

for (File file : listFiles(configDirectory)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
// unlike function namespaces and connectors, we don't have a concept of catalog
// name here (conventionally config file name without the extension)
// because plan checkers are never referenced by name.
Map<String, String> properties = new HashMap<>(loadProperties(file));
checkState(!isNullOrEmpty(properties.get(PLAN_CHECKER_PROVIDER_NAME)),
String planCheckerProviderName = properties.remove(PLAN_CHECKER_PROVIDER_NAME);
checkState(!isNullOrEmpty(planCheckerProviderName),
"Plan checker configuration %s does not contain %s",
file.getAbsoluteFile(),
PLAN_CHECKER_PROVIDER_NAME);
String planCheckerProviderName = properties.remove(PLAN_CHECKER_PROVIDER_NAME);
log.info("-- Loading plan checker provider [%s] --", planCheckerProviderName);
PlanCheckerProviderFactory providerFactory = providerFactories.get(planCheckerProviderName);
checkState(providerFactory != null,
"No planCheckerProviderFactory found for '%s'. Available factories were %s", planCheckerProviderName, providerFactories.keySet());
providers.addIfAbsent(providerFactory.create(properties, planCheckerProviderContext));
log.info("-- Added plan checker provider [%s] --", planCheckerProviderName);
loadPlanCheckerProvider(planCheckerProviderName, properties, nodeManager);
}
}
}

public void loadPlanCheckerProvider(String planCheckerProviderName, Map<String, String> properties, NodeManager nodeManager)
{
checkArgument(!isNullOrEmpty(planCheckerProviderName), "Plan checker provider name is null or empty");
requireNonNull(properties, "properties is null");
requireNonNull(nodeManager, "nodeManager is null");

PlanCheckerProviderContext planCheckerProviderContext = new PlanCheckerProviderContext(simplePlanFragmentSerde, nodeManager);

log.info("-- Loading plan checker provider [%s] --", planCheckerProviderName);
PlanCheckerProviderFactory providerFactory = providerFactories.get(planCheckerProviderName);
checkState(providerFactory != null,
"No planCheckerProviderFactory found for '%s'. Available factories were %s", planCheckerProviderName, providerFactories.keySet());
providers.addIfAbsent(providerFactory.create(properties, planCheckerProviderContext));
log.info("-- Added plan checker provider [%s] --", planCheckerProviderName);
}

private static List<File> listFiles(File dir)
{
if (dir != null && dir.isDirectory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ default void loadTypeManager(String typeManagerName)
throw new UnsupportedOperationException();
}

default void loadPlanCheckerProviderManager(String planCheckerProviderName, Map<String, String> properties)
{
throw new UnsupportedOperationException();
}

class MaterializedResultWithPlan
{
private final MaterializedResult materializedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,9 @@ protocol::PlanConversionResponse prestoToVeloxPlanConversion(
auto tableWriteInfo = std::make_shared<protocol::TableWriteInfo>();

// Attempt to convert the plan fragment to a Velox plan.
if (auto writeNode =
std::dynamic_pointer_cast<const protocol::TableWriterNode>(
planFragment.root)) {
// TableWriteInfo is not yet built at the planning stage, so we can not
// fully convert a TableWriteNode and skip that node of the fragment.
auto writeSourceNode =
converter.toVeloxQueryPlan(writeNode->source, tableWriteInfo, taskId);
planValidator->validatePlanFragment(core::PlanFragment(writeSourceNode));
} else {
auto veloxPlan =
converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId);
planValidator->validatePlanFragment(veloxPlan);
}
auto veloxPlan =
Comment thread
aditi-pandit marked this conversation as resolved.
converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId);
planValidator->validatePlanFragment(veloxPlan);
} catch (const VeloxException& e) {
response.failures.emplace_back(
copyFailureInfo(VeloxToPrestoExceptionTranslator::translate(e)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,36 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.PlanChecker;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanVisitor;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.sidecar.nativechecker.NativePlanCheckerErrorCode.NATIVEPLANCHECKER_CONNECTION_ERROR;
import static com.facebook.presto.sidecar.nativechecker.NativePlanCheckerErrorCode.NATIVEPLANCHECKER_UNKNOWN_CONVERSION_FAILURE;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

/**
* Uses the native sidecar to check verify a plan can be run on a native worker.
Expand Down Expand Up @@ -70,12 +83,34 @@ public void validate(PlanNode planNode, WarningCollector warningCollector, Conne
@Override
public void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector, ConnectorSession session)
{
if (!planFragment.getPartitioning().isCoordinatorOnly() && !isInternalSystemConnector(planFragment.getRoot())) {
runValidation(planFragment);
}
else {
if (planFragment.getPartitioning().isCoordinatorOnly()
|| isInternalSystemConnector(planFragment.getRoot())) {
LOG.debug("Skipping native plan validation [fragment: %s, root: %s]", planFragment.getId(), planFragment.getRoot().getId());
return;
}
runValidation(removeTableWriter(planFragment));
}

/**
* HACK: Replace TableWriterNode from the plan fragment with a ProjectNode because validating a TableWriterNode
* is unsupported by the native sidecar. They are unsupported because they contain information only determined
* during scheduling.
*/
private SimplePlanFragment removeTableWriter(SimplePlanFragment planFragment)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to add a note as to why the TableWriterNode gets replaced

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL

{
// Remove TableWriterNode from the plan fragment
PlanNode root = planFragment.getRoot().accept(new TableWriterNodeReplacer(), null);
requireNonNull(root, "TableWriterNode removal resulted in null root");

return new SimplePlanFragment(
planFragment.getId(),
root,
planFragment.getVariables(),
planFragment.getPartitioning(),
planFragment.getTableScanSchedulingOrder(),
planFragment.getPartitioningScheme(),
planFragment.getStageExecutionDescriptor(),
planFragment.isOutputTableWriterFragment());
}

private boolean isInternalSystemConnector(PlanNode planNode)
Expand Down Expand Up @@ -152,4 +187,40 @@ public Boolean visitPlan(PlanNode node, Void context)
return false;
}
}

private static class TableWriterNodeReplacer
extends PlanVisitor<PlanNode, Void>
{
@Override
public PlanNode visitTableWriter(TableWriterNode tableWriter, Void context)
{
// Create dummy assignments for the ProjectNode
Map<VariableReferenceExpression, RowExpression> assignmentsMap = tableWriter.getOutputVariables()
.subList(3, tableWriter.getOutputVariables().size())
.stream()
.collect(toMap(i -> i, i -> i));
assignmentsMap.put(tableWriter.getRowCountVariable(), new ConstantExpression(0L, BIGINT));
assignmentsMap.put(tableWriter.getFragmentVariable(), new ConstantExpression(utf8Slice(""), VARCHAR));
assignmentsMap.put(tableWriter.getTableCommitContextVariable(), new ConstantExpression(utf8Slice(""), VARCHAR));
Assignments assignments = Assignments.builder().putAll(assignmentsMap).build();

// Replace TableWriterNode with a ProjectNode
return new ProjectNode(
tableWriter.getId(),
tableWriter.getSource(),
Assignments.builder().putAll(assignmentsMap).build());
}

@Override
public PlanNode visitPlan(PlanNode node, Void context)
{
// Recursively process child nodes
List<PlanNode> prunedChildren = node.getSources().stream()
.map(child -> child.accept(this, context))
.collect(toImmutableList());

// Replace the current node's children with the pruned children
return node.replaceChildren(prunedChildren);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class NativePlanCheckerConfig
{
public static final String CONFIG_PREFIX = "native-plan-checker";
private boolean enabled;
private boolean enabled = true;

public boolean isPlanValidationEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.sidecar.typemanager;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.DistinctTypeInfo;
import com.facebook.presto.common.type.FunctionType;
import com.facebook.presto.common.type.ParametricType;
Expand Down Expand Up @@ -74,8 +73,6 @@
public class NativeTypeManager
implements TypeManager
{
private static final Logger log = Logger.get(NativeTypeManager.class);

private static final Set<String> NATIVE_ENGINE_SUPPORTED_TYPES =
ImmutableSet.of(
BIGINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ public static void setupNativeSidecarPlugin(QueryRunner queryRunner)
"supported-function-languages", "CPP",
"function-implementation-type", "CPP"));
queryRunner.loadTypeManager(NativeTypeManagerFactory.NAME);
queryRunner.loadPlanCheckerProviderManager("native", ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sidecar;

import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.UUID;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNation;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrdersEx;
import static com.facebook.presto.sidecar.NativeSidecarPluginQueryRunnerUtils.setupNativeSidecarPlugin;
import static java.lang.String.format;

@Test(singleThreaded = true)
public class TestNativeSidecarPlanChecker
extends AbstractTestQueryFramework
{
@Override
protected void createTables()
{
QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner();
createLineitem(queryRunner);
createNation(queryRunner);
createOrders(queryRunner);
createOrdersEx(queryRunner);
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.nativeHiveQueryRunnerBuilder()
.setCoordinatorSidecarEnabled(true)
.setExtraProperties(ImmutableMap.of("http-server.http.port", "8089"))
.build();
setupNativeSidecarPlugin(queryRunner);
queryRunner.getCoordinator().createCatalog("hive2", "hive");
return queryRunner;
}

@Override
protected QueryRunner createExpectedQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder().build();
}

@Test
public void createAndInsertUnbucketedTable()
{
String tableName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", "");
try {
assertUpdate(format("CREATE TABLE %s AS SELECT orderkey key1, comment value1 FROM orders", tableName), 15000);
assertUpdate(format("INSERT INTO %s SELECT orderkey key1, comment value1 FROM orders", tableName), 15000);
}
finally {
// Clean up the temporary tables
getExpectedQueryRunner().execute(getSession(), format("DROP TABLE IF EXISTS %s", tableName), ImmutableList.of(BIGINT));
}
}

@Test
public void selectFromUnbucketedTable()
{
assertQuery(format("SELECT orderkey key1, comment value1 FROM orders"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Set;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

Expand All @@ -64,14 +63,10 @@ public class TestPlanCheckerProvider
public void testGetPlanChecker()
{
NativePlanCheckerConfig config = new NativePlanCheckerConfig();
assertFalse(config.isPlanValidationEnabled());
assertTrue(config.isPlanValidationEnabled());
NativePlanCheckerProvider provider = new NativePlanCheckerProvider(new TestingNodeManager(URI.create("localhost")), PLAN_FRAGMENT_JSON_CODEC, config);
assertTrue(provider.getIntermediatePlanCheckers().isEmpty());
assertTrue(provider.getFinalPlanCheckers().isEmpty());
assertTrue(provider.getFragmentPlanCheckers().isEmpty());

// Enable the native plan checker, should appear in fragment plan checkers
config.setPlanValidationEnabled(true);
assertEquals(provider.getFragmentPlanCheckers().size(), 1);
}

Expand Down
Loading
Loading