From 8be3b00175722127c168bf5349b5015bb437c635 Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Thu, 8 Jun 2023 13:44:50 -0700 Subject: [PATCH 1/6] [native pos] Add PrestoSparkNativeTaskExecutorFactory Add a new class PrestoSparkNativeTaskExecutorFactory which is responsible for executing native task remotely. It contains logic to start a native task, monitor for results and handle any exceptions that occur during task execution This is a singleton class that is selectively invoked only while executing a PrestoSparkNativeTaskRDD --- .../presto/spark/PrestoSparkModule.java | 2 + .../presto/spark/PrestoSparkService.java | 10 + .../HttpNativeExecutionTaskInfoFetcher.java | 2 +- .../execution/NativeExecutionProcess.java | 4 + .../NativeExecutionProcessFactory.java | 4 +- .../spark/execution/NativeExecutionTask.java | 7 +- .../PrestoSparkNativeTaskExecutorFactory.java | 501 ++++++++++++++++++ .../presto/spark/PrestoSparkQueryRunner.java | 8 + .../IPrestoSparkService.java | 2 + ...restoSparkTaskExecutorFactoryProvider.java | 2 + .../PrestoSparkTaskProcessor.java | 2 +- .../spark/launcher/PrestoSparkRunner.java | 8 + 12 files changed, 546 insertions(+), 6 deletions(-) create mode 100644 presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index 788dea643fe20..63daad315ecbf 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -111,6 +111,7 @@ import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory; import com.facebook.presto.spark.execution.PrestoSparkLocalShuffleReadInfo; import com.facebook.presto.spark.execution.PrestoSparkLocalShuffleWriteInfo; +import com.facebook.presto.spark.execution.PrestoSparkNativeTaskExecutorFactory; import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory; import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig; import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig; @@ -516,6 +517,7 @@ protected void setup(Binder binder) binder.bind(PrestoSparkAccessControlChecker.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkPlanFragmenter.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkRddFactory.class).in(Scopes.SINGLETON); + binder.bind(PrestoSparkNativeTaskExecutorFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkTaskExecutorFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkQueryExecutionFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkService.class).in(Scopes.SINGLETON); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java index cce500eeed02f..982acb8693bbe 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java @@ -17,6 +17,7 @@ import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecutionFactory; import com.facebook.presto.spark.classloader_interface.IPrestoSparkService; import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory; +import com.facebook.presto.spark.execution.PrestoSparkNativeTaskExecutorFactory; import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory; import javax.inject.Inject; @@ -28,16 +29,19 @@ public class PrestoSparkService { private final PrestoSparkQueryExecutionFactory queryExecutionFactory; private final PrestoSparkTaskExecutorFactory taskExecutorFactory; + private final PrestoSparkNativeTaskExecutorFactory prestoSparkNativeTaskExecutorFactory; private final LifeCycleManager lifeCycleManager; @Inject public PrestoSparkService( PrestoSparkQueryExecutionFactory queryExecutionFactory, PrestoSparkTaskExecutorFactory taskExecutorFactory, + PrestoSparkNativeTaskExecutorFactory prestoSparkNativeTaskExecutorFactory, LifeCycleManager lifeCycleManager) { this.queryExecutionFactory = requireNonNull(queryExecutionFactory, "queryExecutionFactory is null"); this.taskExecutorFactory = requireNonNull(taskExecutorFactory, "taskExecutorFactory is null"); + this.prestoSparkNativeTaskExecutorFactory = requireNonNull(prestoSparkNativeTaskExecutorFactory, "prestoSparkNativeTaskExecutorFactory is null"); this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); } @@ -53,6 +57,12 @@ public IPrestoSparkTaskExecutorFactory getTaskExecutorFactory() return taskExecutorFactory; } + @Override + public IPrestoSparkTaskExecutorFactory getNativeTaskExecutorFactory() + { + return prestoSparkNativeTaskExecutorFactory; + } + @Override public void close() { diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java index 65f89747e42af..2c6bcf0bc7f7a 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java @@ -144,7 +144,7 @@ public void stop() public Optional getTaskInfo() throws RuntimeException { - if (scheduledFuture != null && scheduledFuture.isCancelled()) { + if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) { throw lastException.get(); } TaskInfo info = taskInfo.get(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java index d5fd58644949c..a634a9d719d41 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java @@ -183,6 +183,10 @@ public int getPort() return port; } + public URI getLocation() + { + return location; + } private static URI getBaseUriWithPort(URI baseUri, int port) { return uriBuilderFrom(baseUri) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java index 598a544153a05..f3d2ca6ee0cec 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java @@ -38,10 +38,8 @@ public class NativeExecutionProcessFactory { - // TODO add config - private static final int MAX_THREADS = 1000; private static final Duration MAX_ERROR_DURATION = new Duration(2, TimeUnit.MINUTES); - + public static final URI DEFAULT_URI = URI.create("http://127.0.0.1"); private final HttpClient httpClient; private final ExecutorService coreExecutor; private final ScheduledExecutorService errorRetryScheduledExecutor; diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java index b52dafe8dd335..0a215ef363b1b 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java @@ -29,6 +29,7 @@ import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient; import com.facebook.presto.spi.page.SerializedPage; import com.facebook.presto.sql.planner.PlanFragment; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import java.net.URI; @@ -38,6 +39,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import static com.facebook.presto.execution.TaskState.ABORTED; +import static com.facebook.presto.execution.TaskState.CANCELED; +import static com.facebook.presto.execution.TaskState.FAILED; import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static java.util.Objects.requireNonNull; @@ -158,7 +162,8 @@ public TaskInfo start() { TaskInfo taskInfo = sendUpdateRequest(); - if (!taskInfo.getTaskStatus().getState().isDone()) { + // We do not start taskInfo fetcher for failed tasks + if (!ImmutableList.of(CANCELED, FAILED, ABORTED).contains(taskInfo.getTaskStatus().getState())) { log.info("Starting TaskInfoFetcher and TaskResultFetcher."); taskResultFetcher.ifPresent(fetcher -> fetcher.start()); taskInfoFetcher.start(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java new file mode 100644 index 0000000000000..58d32ea929d21 --- /dev/null +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java @@ -0,0 +1,501 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spark.execution; + +import com.facebook.airlift.json.Codec; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.Session; +import com.facebook.presto.execution.ExecutionFailureInfo; +import com.facebook.presto.execution.Lifespan; +import com.facebook.presto.execution.Location; +import com.facebook.presto.execution.ScheduledSplit; +import com.facebook.presto.execution.StageExecutionId; +import com.facebook.presto.execution.StageId; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.execution.TaskSource; +import com.facebook.presto.execution.TaskState; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.RemoteTransactionHandle; +import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.metadata.Split; +import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider; +import com.facebook.presto.spark.PrestoSparkTaskDescriptor; +import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor; +import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory; +import com.facebook.presto.spark.classloader_interface.MutablePartitionId; +import com.facebook.presto.spark.classloader_interface.PrestoSparkNativeTaskInputs; +import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage; +import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleReadDescriptor; +import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats; +import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs; +import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput; +import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor; +import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource; +import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo; +import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.page.SerializedPage; +import com.facebook.presto.spi.plan.OutputNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.spi.security.TokenAuthenticator; +import com.facebook.presto.split.RemoteSplit; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.PlanFragmentId; +import com.facebook.presto.sql.planner.plan.RemoteSourceNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.spark.util.CollectionAccumulator; +import scala.Tuple2; +import scala.collection.AbstractIterator; +import scala.collection.Iterator; + +import javax.inject.Inject; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID; +import static com.facebook.presto.spark.execution.NativeExecutionProcessFactory.DEFAULT_URI; +import static com.facebook.presto.spark.util.PrestoSparkUtils.deserializeZstdCompressed; +import static com.facebook.presto.spark.util.PrestoSparkUtils.serializeZstdCompressed; +import static com.facebook.presto.spark.util.PrestoSparkUtils.toPrestoSparkSerializedPage; +import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static com.facebook.presto.sql.planner.SchedulingOrderVisitor.scheduleOrder; +import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.units.DataSize.succinctBytes; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +/** + * PrestoSparkNativeTaskExecutorFactory is responsible for launching the external native process and managing the communication + * between Java process and native process (by using the {@Link NativeExecutionTask}). + * It will send necessary metadata (e.g, plan fragment, session properties etc.) as a part of + * BatchTaskUpdateRequest. It will poll the remote CPP task for status and results (pages/data if applicable) + * and send these back to the Spark's RDD api + * + * PrestoSparkNativeTaskExecutorFactory is singleton instantiated once per executor. + * + * For every task it receives, it does the following + * 1. Create the Native execution Process (NativeTaskExecutionFactory) ensure that is it created only once. + * 2. Serialize and pass the planFragment, source-metadata (taskSources), sink-metadata (tableWriteInfo or shuffleWriteInfo) + * and submit a nativeExecutionTask. + * 3. Return Iterator to sparkRDD layer. RDD execution will call the .next() methods, which will + * 3.a Call {@link NativeExecutionTask}'s pollResult() to retrieve {@link SerializedPage} back from external process. + * 3.b If no more output is available, then check if task has finished successfully or with exception + * If task finished with exception - fail the spark task (throw exception) + * IF task finished successfully - collect statistics through taskInfo object and add to accumulator + */ +public class PrestoSparkNativeTaskExecutorFactory + implements IPrestoSparkTaskExecutorFactory +{ + private static final Logger log = Logger.get(PrestoSparkNativeTaskExecutorFactory.class); + + // For Presto-on-Spark, we do not have remoteSourceTasks as the shuffle data is + // in persistent shuffle. + // Current protocol for Split mandates having a remoteSourceTaskId as the + // part of the split info. So for shuffleRead split we set it to a dummy + // value that is ignored by the shuffle-reader + private static final TaskId DUMMY_TASK_ID = TaskId.valueOf("remotesourcetaskid.0.0.0.0"); + + private final SessionPropertyManager sessionPropertyManager; + private final FunctionAndTypeManager functionAndTypeManager; + private final JsonCodec taskDescriptorJsonCodec; + private final Codec taskSourceCodec; + private final Codec taskInfoCodec; + private final PrestoSparkExecutionExceptionFactory executionExceptionFactory; + private final Set authenticatorProviders; + private final NativeExecutionProcessFactory nativeExecutionProcessFactory; + private final NativeExecutionTaskFactory nativeExecutionTaskFactory; + private final PrestoSparkShuffleInfoTranslator shuffleInfoTranslator; + private NativeExecutionProcess nativeExecutionProcess; + + @Inject + public PrestoSparkNativeTaskExecutorFactory( + SessionPropertyManager sessionPropertyManager, + FunctionAndTypeManager functionAndTypeManager, + JsonCodec taskDescriptorJsonCodec, + Codec taskSourceCodec, + Codec taskInfoCodec, + PrestoSparkExecutionExceptionFactory executionExceptionFactory, + Set authenticatorProviders, + PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, + NativeExecutionProcessFactory nativeExecutionProcessFactory, + NativeExecutionTaskFactory nativeExecutionTaskFactory, + PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) + { + this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); + this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null"); + this.taskDescriptorJsonCodec = requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null"); + this.taskSourceCodec = requireNonNull(taskSourceCodec, "taskSourceCodec is null"); + this.taskInfoCodec = requireNonNull(taskInfoCodec, "taskInfoCodec is null"); + this.executionExceptionFactory = requireNonNull(executionExceptionFactory, "executionExceptionFactory is null"); + this.authenticatorProviders = ImmutableSet.copyOf(requireNonNull(authenticatorProviders, "authenticatorProviders is null")); + this.nativeExecutionProcessFactory = requireNonNull(nativeExecutionProcessFactory, "processFactory is null"); + this.nativeExecutionTaskFactory = requireNonNull(nativeExecutionTaskFactory, "taskFactory is null"); + this.shuffleInfoTranslator = requireNonNull(shuffleInfoTranslator, "shuffleInfoFactory is null"); + } + + @Override + public IPrestoSparkTaskExecutor create( + int partitionId, + int attemptNumber, + SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, + Iterator serializedTaskSources, + PrestoSparkTaskInputs inputs, + CollectionAccumulator taskInfoCollector, + CollectionAccumulator shuffleStatsCollector, + Class outputType) + { + try { + return doCreate( + partitionId, + attemptNumber, + serializedTaskDescriptor, + serializedTaskSources, + inputs, + taskInfoCollector, + shuffleStatsCollector, + outputType); + } + catch (RuntimeException e) { + throw executionExceptionFactory.toPrestoSparkExecutionException(e); + } + } + + public IPrestoSparkTaskExecutor doCreate( + int partitionId, + int attemptNumber, + SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, + Iterator serializedTaskSources, + PrestoSparkTaskInputs inputs, + CollectionAccumulator taskInfoCollector, + CollectionAccumulator shuffleStatsCollector, + Class outputType) + { + PrestoSparkTaskDescriptor taskDescriptor = taskDescriptorJsonCodec.fromJson(serializedTaskDescriptor.getBytes()); + ImmutableMap.Builder extraAuthenticators = ImmutableMap.builder(); + authenticatorProviders.forEach(provider -> extraAuthenticators.putAll(provider.getTokenAuthenticators())); + + Session session = taskDescriptor.getSession().toSession( + sessionPropertyManager, + taskDescriptor.getExtraCredentials(), + extraAuthenticators.build()); + PlanFragment fragment = taskDescriptor.getFragment(); + StageId stageId = new StageId(session.getQueryId(), fragment.getId().getId()); + TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId, attemptNumber); + + // TODO: Remove this once we can display the plan on Spark UI. + // Currently, `textPlanFragment` throws an exception if json-based UDFs are used in the query, which can only + // happen in native execution mode. To resolve this error, `JsonFileBasedFunctionNamespaceManager` must be + // loaded on the executors as well (which is actually not required for native execution). To do so, we need a + // mechanism to ship the JSON file containing the UDF metadata to workers, which does not exist as of today. + // TODO: Address this issue; more details in https://github.com/prestodb/presto/issues/19600 + log.info("Logging plan fragment is not supported for presto-on-spark native execution, yet"); + + if (fragment.getPartitioning().isCoordinatorOnly()) { + throw new UnsupportedOperationException("Coordinator only fragment execution is not supported by native task executor"); + } + + checkArgument( + inputs instanceof PrestoSparkNativeTaskInputs, + format("PrestoSparkNativeTaskInputs is required for native execution, but %s is provided", inputs.getClass().getName())); + + // 1. Start the native process if it hasn't already been started or dead + createAndStartNativeExecutionProcess(session); + + // 2. compute the task info to send to cpp process + PrestoSparkNativeTaskInputs nativeInputs = (PrestoSparkNativeTaskInputs) inputs; + // 2.a Populate Read info + List taskSources = getTaskSources(serializedTaskSources, fragment, session, nativeInputs); + + // 2.b Populate Write info + Optional shuffleWriteInfo = nativeInputs.getShuffleWriteDescriptor().isPresent() + && !findTableWriteNode(fragment.getRoot()).isPresent() + && !(fragment.getRoot() instanceof OutputNode) ? + Optional.of(shuffleInfoTranslator.createShuffleWriteInfo(session, nativeInputs.getShuffleWriteDescriptor().get())) : Optional.empty(); + Optional serializedShuffleWriteInfo = shuffleWriteInfo.map(shuffleInfoTranslator::createSerializedWriteInfo); + + // 3. Submit the task to cpp process for execution + log.info("Submitting native execution task "); + NativeExecutionTask task = nativeExecutionTaskFactory.createNativeExecutionTask( + session, + nativeExecutionProcess.getLocation(), + taskId, + fragment, + ImmutableList.copyOf(taskSources), + taskDescriptor.getTableWriteInfo(), + serializedShuffleWriteInfo); + + log.info("Creating task and will wait for remote task completion"); + TaskInfo taskInfo = task.start(); + + // task creation might have failed + processTaskInfoForErrors(taskInfo); + // 4. return output to spark RDD layer + return new PrestoSparkNativeTaskOutputIterator<>(task, outputType, taskInfoCollector, taskInfoCodec, executionExceptionFactory); + } + + private static void completeTask(CollectionAccumulator taskInfoCollector, NativeExecutionTask task, Codec taskInfoCodec) + { + // stop the task + task.stop(); + + // collect statistics (if available) + Optional taskInfoOptional = task.getTaskInfo(); + if (!taskInfoOptional.isPresent()) { + log.error("Missing taskInfo. Statistics might be inaccurate"); + return; + } + SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(serializeZstdCompressed(taskInfoCodec, taskInfoOptional.get())); + taskInfoCollector.add(serializedTaskInfo); + } + + private static void processTaskInfoForErrors(TaskInfo taskInfo) + { + if (!taskInfo.getTaskStatus().getState().isDone()) { + log.info("processTaskInfoForErrors: task is not done yet.. %s", taskInfo); + return; + } + + if (!taskInfo.getTaskStatus().getState().equals(TaskState.FINISHED)) { + // task failed with errors + RuntimeException failure = taskInfo.getTaskStatus().getFailures().stream() + .findFirst() + .map(ExecutionFailureInfo::toException) + .orElse(new PrestoException(GENERIC_INTERNAL_ERROR, "Native task failed for an unknown reason")); + throw failure; + } + + log.info("processTaskInfoForErrors: task completed successfully = %s", taskInfo); + } + + private void createAndStartNativeExecutionProcess(Session session) + { + requireNonNull(nativeExecutionProcessFactory, "Trying to instantiate native process but factory is null"); + + try { + // create the CPP sidecar process if it doesn't exist. + // We create this when the first task is scheduled + nativeExecutionProcess = nativeExecutionProcessFactory.getNativeExecutionProcess( + session, + DEFAULT_URI); + nativeExecutionProcess.start(); + } + catch (ExecutionException | InterruptedException | IOException e) { + throw new RuntimeException(e); + } + } + + private List getTaskSources( + Iterator serializedTaskSources, + PlanFragment fragment, + Session session, + PrestoSparkNativeTaskInputs nativeTaskInputs) + { + List taskSources = new ArrayList<>(); + + // Populate TableScan sources + long totalSerializedSizeInBytes = 0; + while (serializedTaskSources.hasNext()) { + SerializedPrestoSparkTaskSource serializedTaskSource = serializedTaskSources.next(); + taskSources.add(deserializeZstdCompressed(taskSourceCodec, serializedTaskSource.getBytes())); + totalSerializedSizeInBytes += serializedTaskSource.getBytes().length; + } + + // When joining bucketed table with a non-bucketed table with a filter on "$bucket", + // some tasks may not have splits for the bucketed table. In this case we still need + // to send no-more-splits message to Velox. + Set planNodeIdsWithSources = taskSources.stream().map(TaskSource::getPlanNodeId).collect(Collectors.toSet()); + Set tableScanIds = Sets.newHashSet(scheduleOrder(fragment.getRoot())); + tableScanIds.stream() + .filter(id -> !planNodeIdsWithSources.contains(id)) + .forEach(id -> taskSources.add(new TaskSource(id, ImmutableSet.of(), true))); + + log.info("Total serialized size of all table scan task sources: %s", succinctBytes(totalSerializedSizeInBytes)); + + // Populate ShuffleRead sources + ImmutableList.Builder shuffleTaskSources = ImmutableList.builder(); + AtomicLong nextSplitId = new AtomicLong(); + taskSources.stream() + .flatMap(source -> source.getSplits().stream()) + .mapToLong(ScheduledSplit::getSequenceId) + .max() + .ifPresent(id -> nextSplitId.set(id + 1)); + + for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) { + for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) { + PrestoSparkShuffleReadDescriptor shuffleReadDescriptor = + nativeTaskInputs.getShuffleReadDescriptors().get(sourceFragmentId.toString()); + if (shuffleReadDescriptor != null) { + ScheduledSplit split = new ScheduledSplit(nextSplitId.getAndIncrement(), remoteSource.getId(), new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit( + new Location(format("batch://%s?shuffleInfo=%s", DUMMY_TASK_ID, + shuffleInfoTranslator.createSerializedReadInfo( + shuffleInfoTranslator.createShuffleReadInfo(session, shuffleReadDescriptor)))), + DUMMY_TASK_ID))); + TaskSource source = new TaskSource(remoteSource.getId(), ImmutableSet.of(split), ImmutableSet.of(Lifespan.taskWide()), true); + shuffleTaskSources.add(source); + } + } + } + + taskSources.addAll(shuffleTaskSources.build()); + + return taskSources; + } + + private Optional findTableWriteNode(PlanNode node) + { + return searchFrom(node) + .where(TableWriterNode.class::isInstance) + .findFirst(); + } + + private static class PrestoSparkNativeTaskOutputIterator + extends AbstractIterator> + implements IPrestoSparkTaskExecutor + { + private final NativeExecutionTask nativeExecutionTask; + private Optional next = Optional.empty(); + private final CollectionAccumulator taskInfoCollectionAccumulator; + private final Codec taskInfoCodec; + private final Class outputType; + private final PrestoSparkExecutionExceptionFactory executionExceptionFactory; + + public PrestoSparkNativeTaskOutputIterator( + NativeExecutionTask nativeExecutionTask, + Class outputType, + CollectionAccumulator taskInfoCollectionAccumulator, + Codec taskInfoCodec, + PrestoSparkExecutionExceptionFactory executionExceptionFactory) + { + this.nativeExecutionTask = nativeExecutionTask; + this.taskInfoCollectionAccumulator = taskInfoCollectionAccumulator; + this.taskInfoCodec = taskInfoCodec; + this.outputType = outputType; + this.executionExceptionFactory = executionExceptionFactory; + } + + /** + * This function is called by Spark's RDD layer to check if there are output pages + * There are 2 scenarios + * 1. ShuffleMap Task - Always returns false. But the internal function calls do all the work needed + * 2. Result Task - True until pages are available. False once all pages have been extracted + * @return if output is available + */ + @Override + public boolean hasNext() + { + next = computeNext(); + return next.isPresent(); + } + + /** This function returns the next available page fetched from CPP process + * + * Has 3 main responsibilities + * 1) Busy-wait-for-pages-or-completion + * + * Loop until either of the 3 conditions happen + * * 1. We get a page + * * 2. Task has finished successfully + * * 3. Task has finished with error + * + * For ShuffleMap Task, as of now, the CPP process returns no pages. + * So the loop acts as a wait-for-completion loop and returns an Optional.empty() + * once the task has terminated + * + * For a Result Task, this function will return all the pages and Optional.empty() + * once all the pages have been read and the task has been terminates + * + * 2) Exception handling + * when there are no pages available, the function checks if the task has finished + * with exceptions and throws the appropriate exception back to spark's RDD processing + * layer + * + * 3) Statistics collection + * For both, when the task finished successfully or with exception, it tries to collect + * statistics if TaskInfo object is available + * + * @return Optional outputPage + */ + private Optional computeNext() + { + // A while(true) loop is not desirable, but in this case we cannot avoid + // it because of Spark'sRDD contract, which is that this iterator either + // returns data or is complete. It CANNOT return null. + // While the remote task is still running and there is no output pages, + // we need to simulate a busy-loop to avoid returning null. + while (true) { + try { + // For ShuffleMap Task, this will always return Optional.empty() + Optional pageOptional = nativeExecutionTask.pollResult(); + + if (pageOptional.isPresent()) { + return pageOptional; + } + + try { + Optional taskInfo = nativeExecutionTask.getTaskInfo(); + + // Case1: Task is still running + if (!taskInfo.isPresent() || !taskInfo.get().getTaskStatus().getState().isDone()) { + continue; + } + + // Case 2: Task finished with errors captured inside taskInfo + processTaskInfoForErrors(taskInfo.get()); + } + catch (RuntimeException ex) { + // For a failed task, if taskInfo is present we still want to log the metrics + completeTask(taskInfoCollectionAccumulator, nativeExecutionTask, taskInfoCodec); + throw executionExceptionFactory.toPrestoSparkExecutionException(ex); + } + + // Case3: Task terminated with success + break; + } + catch (InterruptedException e) { + log.error(e); + throw new RuntimeException(e); + } + } + + // Reaching here marks the end of task processing + completeTask(taskInfoCollectionAccumulator, nativeExecutionTask, taskInfoCodec); + return Optional.empty(); + } + + @Override + public Tuple2 next() + { + // Result Tasks only have outputType of PrestoSparkSerializedPage. + checkArgument(outputType == PrestoSparkSerializedPage.class, + format("PrestoSparkNativeTaskExecutorFactory only outputType=PrestoSparkSerializedPage" + + "But tried to extract outputType=%s", outputType)); + return new Tuple2<>(new MutablePartitionId(), (T) toPrestoSparkSerializedPage(next.get())); + } + } +} diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java index 4c57dc43146fe..c97cb6a060025 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java @@ -696,6 +696,8 @@ public void close() } if (instanceId != null) { + instances.get(instanceId).getPrestoSparkService().getTaskExecutorFactory().close(); + instances.get(instanceId).getPrestoSparkService().getNativeTaskExecutorFactory().close(); instances.remove(instanceId); } } @@ -715,6 +717,12 @@ public IPrestoSparkTaskExecutorFactory get() { return instances.get(instanceId).getPrestoSparkService().getTaskExecutorFactory(); } + + @Override + public IPrestoSparkTaskExecutorFactory getNative() + { + return instances.get(instanceId).getPrestoSparkService().getNativeTaskExecutorFactory(); + } } private static Database createDatabaseMetastoreObject(String name) diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java index 57dc25eb02232..d64e3da7fd759 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java @@ -20,5 +20,7 @@ public interface IPrestoSparkService IPrestoSparkTaskExecutorFactory getTaskExecutorFactory(); + IPrestoSparkTaskExecutorFactory getNativeTaskExecutorFactory(); + void close(); } diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java index a6e0c7bdfa2c0..387eced2901e2 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java @@ -19,4 +19,6 @@ public interface PrestoSparkTaskExecutorFactoryProvider extends Serializable { IPrestoSparkTaskExecutorFactory get(); + + IPrestoSparkTaskExecutorFactory getNative(); } diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java index 6a4979a6b2604..288f9e291a373 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java @@ -79,7 +79,7 @@ public Iterator> process( Map shuffleReadDescriptors, Optional shuffleWriteDescriptor) { - return taskExecutorFactoryProvider.get().create( + return taskExecutorFactoryProvider.getNative().create( TaskContext.get().partitionId(), TaskContext.get().attemptNumber(), serializedTaskDescriptor, diff --git a/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java b/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java index 1802e5704a0f7..71f2fb3d72f0e 100644 --- a/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java +++ b/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java @@ -297,6 +297,14 @@ public IPrestoSparkTaskExecutorFactory get() return prestoSparkService.getTaskExecutorFactory(); } + @Override + public IPrestoSparkTaskExecutorFactory getNative() + { + checkState(TaskContext.get() != null, "this method is expected to be called only from the main task thread on the spark executor"); + IPrestoSparkService prestoSparkService = getOrCreatePrestoSparkService(); + return prestoSparkService.getNativeTaskExecutorFactory(); + } + private static IPrestoSparkService service; private static String currentPackagePath; private static Map currentConfigProperties; From f6499176cc4074bd512d63240248cef84b1f97c7 Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Thu, 8 Jun 2023 13:47:49 -0700 Subject: [PATCH 2/6] [native pos] Remove NativeExecutionNode plan rewrite Currently, for native execution, we re-write the plan fragment to add a NativeExecutionNode as the root to encapsulate the execution of native task. We do not need this anymore as the newly added PrestoSparkNativeTaskExecutorFactory will transparently forward the PlanFragment as is to CPP process --- .../sql/planner/BasePlanFragmenter.java | 31 ----- ...PrestoSparkNativeExecutionPlanRewrite.java | 121 ------------------ 2 files changed, 152 deletions(-) delete mode 100644 presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index 921cba5d43f3b..28fb50d56d0c1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -52,7 +52,6 @@ import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; @@ -78,7 +77,6 @@ import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; -import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; @@ -179,35 +177,6 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan tableWriterNodeIds); } - // Only delegate non-coordinatorOnly plan fragment to native engine - if (isNativeExecutionEnabled(session) && !properties.getPartitioningHandle().isCoordinatorOnly()) { - if (root instanceof OutputNode) { - // OutputNode is special in that it can have duplicate output variables since it - // does not get converted to a PhysicalOperation for execution. - // Regular plan nodes, including NativeExecutionNode, must have unique output variables. - // Check if OutputNode has duplicate output variables and remove these. - // This is safe because OutputNode variables are not used by the workers. - OutputNode outputNode = (OutputNode) root; - List outputVariables = outputNode.getOutputVariables(); - List newOutputVariables = new ArrayList<>(); - List newColumnNames = new ArrayList<>(); - Set uniqueOutputVariables = new HashSet<>(); - for (int i = 0; i < outputVariables.size(); ++i) { - VariableReferenceExpression variable = outputVariables.get(i); - if (uniqueOutputVariables.add(variable)) { - newOutputVariables.add(variable); - newColumnNames.add(outputNode.getColumnNames().get(i)); - } - } - - if (uniqueOutputVariables.size() < outputVariables.size()) { - root = new OutputNode(outputNode.getSourceLocation(), outputNode.getId(), outputNode.getSource(), newColumnNames, newOutputVariables); - } - } - root = new NativeExecutionNode(root); - schedulingOrder = scheduleOrder(root); - } - PlanFragment fragment = new PlanFragment( fragmentId, root, diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java deleted file mode 100644 index 96f61cc72fd86..0000000000000 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.spark; - -import com.facebook.presto.Session; -import com.facebook.presto.cost.PlanNodeStatsEstimate; -import com.facebook.presto.cost.StatsAndCosts; -import com.facebook.presto.metadata.Metadata; -import com.facebook.presto.metadata.MetadataManager; -import com.facebook.presto.spi.plan.PlanNode; -import com.facebook.presto.spi.plan.TableScanNode; -import com.facebook.presto.sql.planner.Plan; -import com.facebook.presto.sql.planner.PlanFragment; -import com.facebook.presto.sql.planner.SubPlan; -import com.facebook.presto.sql.planner.TypeProvider; -import com.facebook.presto.sql.planner.assertions.PlanAssert; -import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; -import com.facebook.presto.testing.QueryRunner; -import com.facebook.presto.tests.AbstractTestQueryFramework; -import org.testng.annotations.Test; - -import java.util.List; - -import static com.facebook.presto.SystemSessionProperties.NATIVE_EXECUTION_ENABLED; -import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; -import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -public class TestPrestoSparkNativeExecutionPlanRewrite - extends AbstractTestQueryFramework -{ - private static final Metadata METADATA = MetadataManager.createTestMetadataManager(); - - private void assertPlanMatch(Session session, PlanNode actual, PlanMatchPattern expected) - { - PlanAssert.assertPlan( - session, - METADATA, - (node, sourceStats, lookup, session2, types) -> PlanNodeStatsEstimate.unknown(), - new Plan(actual, TypeProvider.empty(), StatsAndCosts.empty()), - expected); - } - - private void assertPlanNotMatch(Session session, PlanNode actual, PlanMatchPattern expected) - { - PlanAssert.assertPlanDoesNotMatch( - session, - METADATA, - (node, sourceStats, lookup, session2, types) -> PlanNodeStatsEstimate.unknown(), - new Plan(actual, TypeProvider.empty(), StatsAndCosts.empty()), - expected); - } - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - return PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner(); - } - - @Test - public void testSingleStagePlanFragment() - { - Session session = Session.builder(getSession()) - .setSystemProperty(NATIVE_EXECUTION_ENABLED, "true") - .build(); - - SubPlan subPlan = subplan( - "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment FROM orders_bucketed", - session); - PlanFragment fragment = subPlan.getFragment(); - PlanNode root = fragment.getRoot(); - - assertEquals(1, fragment.getTableScanSchedulingOrder().size()); - assertTrue(fragment.getTableScanSchedulingOrder().contains(root.getId())); - assertEquals(1, subPlan.getAllFragments().size()); - assertTrue(root instanceof NativeExecutionNode); - assertPlanMatch(session, ((NativeExecutionNode) root).getSubPlan(), anyTree(node(TableScanNode.class))); - } - - @Test - public void testMultiStagePlanFragmentsWithCoordinatorOnlyFragment() - { - Session session = Session.builder(getSession()) - .setSystemProperty(NATIVE_EXECUTION_ENABLED, "true") - .setSystemProperty("table_writer_merge_operator_enabled", "false") - .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "false") - .build(); - - SubPlan subPlan = subplan("CREATE TABLE test_table_1 as SELECT orderkey, custkey FROM orders ", session); - List fragmentList = subPlan.getAllFragments(); - - assertEquals(2, fragmentList.size()); - - PlanFragment fragment1 = fragmentList.get(0); - assertTrue(fragment1.getPartitioning().isCoordinatorOnly()); - assertPlanNotMatch(session, fragment1.getRoot(), anyTree(node(NativeExecutionNode.class))); - - PlanFragment fragment2 = fragmentList.get(1); - PlanNode root = fragment2.getRoot(); - assertFalse(fragment2.getPartitioning().isCoordinatorOnly()); - assertEquals(1, fragment2.getTableScanSchedulingOrder().size()); - assertTrue(fragment2.getTableScanSchedulingOrder().contains(root.getId())); - assertTrue(root instanceof NativeExecutionNode); - assertPlanMatch(session, ((NativeExecutionNode) root).getSubPlan(), anyTree(node(TableScanNode.class))); - } -} From 985a588cc0dd6421baa065bc707d6ec6db44cc60 Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Thu, 8 Jun 2023 11:30:03 -0700 Subject: [PATCH 3/6] [native pos] Update SparkMetrics integration to work with NativeTaskExecutorFactory Current integration of metrics for presto-spark-native into spark metrics relies on extracting Operator level info from TaskInfo objects from Native Process. After the introduction of PrestoSparkNativeTaskExecutorFactory, we no longer use NativeExecutionOperator. So instead we can directly extract metrics from TaskInfo object. --- .../PrestoSparkNativeTaskExecutorFactory.java | 4 + .../util/PrestoSparkStatsCollectionUtils.java | 105 +++++------------- 2 files changed, 30 insertions(+), 79 deletions(-) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java index 58d32ea929d21..6da3d2abd1fce 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java @@ -46,6 +46,7 @@ import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource; import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo; import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; +import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.page.SerializedPage; import com.facebook.presto.spi.plan.OutputNode; @@ -272,6 +273,9 @@ private static void completeTask(CollectionAccumulator taskI } SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(serializeZstdCompressed(taskInfoCodec, taskInfoOptional.get())); taskInfoCollector.add(serializedTaskInfo); + + // Update Spark Accumulators for spark internal metrics + PrestoSparkStatsCollectionUtils.collectMetrics(taskInfoOptional.get()); } private static void processTaskInfoForErrors(TaskInfo taskInfo) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java index c6904ee262c3b..7b0d549a4e5d2 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java @@ -15,19 +15,12 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.common.RuntimeMetric; -import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.common.RuntimeUnit; import com.facebook.presto.execution.TaskInfo; -import com.facebook.presto.operator.NativeExecutionInfo; -import com.facebook.presto.operator.OperatorStats; -import com.facebook.presto.operator.PipelineStats; -import com.facebook.presto.operator.TaskStats; import org.apache.commons.text.CaseUtils; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.util.AccumulatorV2; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.TimeUnit; public class PrestoSparkStatsCollectionUtils @@ -41,38 +34,35 @@ private PrestoSparkStatsCollectionUtils() {} public static void collectMetrics(final TaskInfo taskInfo) { - int taskId = -1; - int stageId = -1; + if (taskInfo == null || taskInfo.getStats() == null) { + return; + } + try { - taskId = taskInfo.getTaskId().getId(); - stageId = taskInfo.getTaskId().getStageExecutionId().getStageId().getId(); - Set runtimeStatsSet = collectRuntimeStats(taskInfo); - collectMetrics(runtimeStatsSet); + taskInfo.getStats().getRuntimeStats().getMetrics() + .forEach(PrestoSparkStatsCollectionUtils::incSparkInternalAccumulator); } catch (Exception e) { - log.error("An error occurred while processing taskId=%s stageId=%s", taskId, stageId, e); + log.warn("An error occurred while updating Spark Internal metrics for task=%s", taskInfo, e); } } - public static void collectMetrics(Set runtimeStatsSet) + static void incSparkInternalAccumulator(final String prestoKey, final RuntimeMetric metric) { - runtimeStatsSet.forEach(runStats -> - { - runStats.getMetrics().entrySet().forEach(entry -> { - String prestoKey = entry.getKey(); - String sparkInternalAccumulatorKey = getSparkInternalAccumulatorKey(prestoKey); - collectMetric(sparkInternalAccumulatorKey, prestoKey, entry.getValue()); - }); - }); - } + TaskMetrics sparkTaskMetrics = org.apache.spark.TaskContext.get().taskMetrics(); + if (sparkTaskMetrics == null) { + return; + } - static void collectMetric(final String sparkInternalAccumulatorKey, - final String prestoKey, - final RuntimeMetric metric) - { - boolean isSparkUnitMs = sparkInternalAccumulatorKey.contains("Ms"); - long metricVal = getMetricValue(metric, isSparkUnitMs); - incSparkInternalAccumulator(sparkInternalAccumulatorKey, prestoKey, metricVal); + String sparkInternalAccumulatorName = getSparkInternalAccumulatorKey(prestoKey); + scala.Option accumulatorV2Optional = sparkTaskMetrics.nameToAccums().get(sparkInternalAccumulatorName); + if (accumulatorV2Optional.isEmpty()) { + return; + } + + AccumulatorV2 accumulatorV2 = (AccumulatorV2) accumulatorV2Optional.get(); + accumulatorV2.add( + getMetricLongValue(metric, sparkInternalAccumulatorName.contains("Ms"))); } static String getSparkInternalAccumulatorKey(final String prestoKey) @@ -81,12 +71,13 @@ static String getSparkInternalAccumulatorKey(final String prestoKey) int index = prestoKey.indexOf(PRESTO_NATIVE_OPERATOR_STATS_SEP); return prestoKey.substring(index); } - String[] strs = prestoKey.split("\\."); - if (strs == null || strs.length < 2) { + String[] prestoKeyParts = prestoKey.split("\\."); + int prestoKeyPartsLength = prestoKeyParts.length; + if (prestoKeyPartsLength < 2) { log.debug("Fail to build spark internal key for %s format not supported", prestoKey); return ""; } - String prestoNewKey = String.format("%1$s%2$s", strs[0], strs[strs.length - 1]); + String prestoNewKey = String.format("%1$s%2$s", prestoKeyParts[0], prestoKeyParts[prestoKeyPartsLength - 1]); if (prestoNewKey.contains("_")) { prestoNewKey = CaseUtils.toCamelCase(prestoKey, false, '_'); } @@ -94,35 +85,7 @@ static String getSparkInternalAccumulatorKey(final String prestoKey) PRESTO_NATIVE_OPERATOR_STATS_PREFIX, prestoNewKey); } - static Set collectRuntimeStats(TaskInfo taskInfo) - { - Set stats = new HashSet<>(); - if (taskInfo.getStats() == null) { - return stats; - } - for (PipelineStats pipelineStats : taskInfo.getStats().getPipelines()) { - if (pipelineStats != null) { - for (OperatorStats operatorStats : pipelineStats.getOperatorSummaries()) { - if (operatorStats != null) { - if (operatorStats.getOperatorType().equals("NativeExecutionOperator")) { - NativeExecutionInfo nativeExecutionInfo = (NativeExecutionInfo) operatorStats.getInfo(); - if (nativeExecutionInfo != null) { - for (TaskStats taskStats : nativeExecutionInfo.getTaskStats()) { - if (taskStats != null) { - RuntimeStats runtimeStat = taskStats.getRuntimeStats(); - stats.add(runtimeStat); - } - } - } - } - } - } - } - } - return stats; - } - - static long getMetricValue(RuntimeMetric metric, boolean isSparkUnitMs) + static long getMetricLongValue(RuntimeMetric metric, boolean isSparkUnitMs) { long sum = metric.getSum(); if (metric.getUnit().equals(RuntimeUnit.NANO) && isSparkUnitMs) { @@ -130,20 +93,4 @@ static long getMetricValue(RuntimeMetric metric, boolean isSparkUnitMs) } return sum; } - - static void incSparkInternalAccumulator(final String sparkInternalAccuName, final String prestoKey, final Object metric) - { - TaskMetrics tm = org.apache.spark.TaskContext.get().taskMetrics(); - if (tm != null) { - scala.Option acc2 = tm.nameToAccums().get(sparkInternalAccuName); - if (!acc2.isEmpty()) { - AccumulatorV2 acc = (AccumulatorV2) acc2.get(); - acc.add(metric); - } - else { - log.debug("Fail to find spark internal accumulator matching key:" + - " %s prestoKey = %s ", sparkInternalAccuName, prestoKey); - } - } - } } From 3eb40c1d9e4a01a7c3fff548b65901de20755e97 Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Thu, 8 Jun 2023 10:19:32 -0700 Subject: [PATCH 4/6] [native pos] Remove NativeExecutionNode and NativeExecutionOperator Currently, there are no usages of NativeExecutionNode and NativeExecutionOperator after the introduction of PrestoSparkNativeTaskExecutorFactory. This commit removes them and other dormant/unused code that handles these classes. --- .../presto/sql/planner/InputExtractor.java | 8 - .../presto/sql/planner/OutputExtractor.java | 8 - .../presto/sql/planner/PlanFragment.java | 6 - .../sql/planner/SchedulingOrderVisitor.java | 8 - .../sql/planner/SplitSourceFactory.java | 7 - .../presto/sql/planner/iterative/Plans.java | 8 - .../sql/planner/plan/InternalPlanVisitor.java | 5 - .../sql/planner/plan/NativeExecutionNode.java | 96 ---- .../sql/planner/planPrinter/PlanPrinter.java | 10 - .../facebook/presto/util/GraphvizPrinter.java | 8 - .../iterative/rule/test/PlanBuilder.java | 6 - .../planner/planPrinter/TestPlanPrinter.java | 39 -- .../PrestoSparkSourceStatsCollector.java | 8 - .../execution/PrestoSparkTaskExecution.java | 10 +- .../PrestoSparkTaskExecutorFactory.java | 157 +------ .../operator/NativeExecutionOperator.java | 428 ------------------ .../spark/planner/PrestoSparkRddFactory.java | 18 - .../task/TestPrestoSparkTaskExecution.java | 12 +- 18 files changed, 19 insertions(+), 823 deletions(-) delete mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java delete mode 100644 presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java index c63fe06f920ea..5ee8b49d4fe97 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java @@ -29,7 +29,6 @@ import com.facebook.presto.sql.planner.plan.IndexSourceNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.JoinNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.SemiJoinNode; import com.facebook.presto.sql.planner.plan.SpatialJoinNode; import com.google.common.collect.ImmutableList; @@ -155,13 +154,6 @@ public Void visitIndexSource(IndexSourceNode node, Context context) return null; } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Context context) - { - node.getSubPlan().accept(this, context); - return null; - } - @Override public Void visitPlan(PlanNode node, Context context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java index e397b5cc487f3..6a6981110112d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java @@ -18,7 +18,6 @@ import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.google.common.base.VerifyException; @@ -62,13 +61,6 @@ public Void visitTableWriter(TableWriterNode node, Void context) return null; } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - node.getSubPlan().accept(this, context); - return null; - } - @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java index 5afb4e548a973..7ab4d09a16767 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java @@ -20,7 +20,6 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.VariableReferenceExpression; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.fasterxml.jackson.annotation.JsonCreator; @@ -214,11 +213,6 @@ private static void findRemoteSourceNodes(PlanNode node, Builder schedulingOr schedulingOrder.accept(node.getId()); return null; } - - @Override - public Void visitNativeExecution(NativeExecutionNode node, Consumer schedulingOrder) - { - schedulingOrder.accept(node.getId()); - return null; - } } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java index 10838c01518b9..056caee1e720c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java @@ -48,7 +48,6 @@ import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.MergeJoinNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; import com.facebook.presto.sql.planner.plan.SampleNode; @@ -398,12 +397,6 @@ public Map visitExchange(ExchangeNode node, Context con return processSources(node.getSources(), context); } - @Override - public Map visitNativeExecution(NativeExecutionNode node, Context context) - { - return processSources(ImmutableList.of(node.getSubPlan()), context); - } - private Map processSources(List sources, Context context) { ImmutableMap.Builder result = ImmutableMap.builder(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java index 26e05f5aad4a9..b3a75707178cf 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java @@ -15,7 +15,6 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import java.util.List; import java.util.stream.Collectors; @@ -55,13 +54,6 @@ public PlanNode visitGroupReference(GroupReference node, Void context) { return lookup.resolve(node).accept(this, context); } - - @Override - public PlanNode visitNativeExecution(NativeExecutionNode node, Void context) - { - node.getSubPlan().accept(this, context); - return node; - } } private Plans() {} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java index f31ec701d1ede..f2828854f6fbc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java @@ -176,9 +176,4 @@ public R visitStatsEquivalentPlanNodeWithLimit(StatsEquivalentPlanNodeWithLimit { return visitPlan(node, context); } - - public R visitNativeExecution(NativeExecutionNode node, C context) - { - return visitPlan(node, context); - } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java deleted file mode 100644 index 9e10dc12d9429..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.sql.planner.plan; - -import com.facebook.presto.spi.SourceLocation; -import com.facebook.presto.spi.plan.PlanNode; -import com.facebook.presto.spi.plan.PlanNodeId; -import com.facebook.presto.spi.relation.VariableReferenceExpression; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import javax.annotation.concurrent.Immutable; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -/** - * The NativeExecutionNode is a wrapper node encapsulating the actual logical plan nodes in the subPlan field which will be eventually executed on the native engine. - */ -@Immutable -public class NativeExecutionNode - extends InternalPlanNode -{ - private final PlanNode subPlan; - - @JsonCreator - public NativeExecutionNode(Optional sourceLocation, @JsonProperty("id") PlanNodeId id, @JsonProperty("subPlan") PlanNode subPlan) - { - this(sourceLocation, id, Optional.empty(), subPlan); - } - - public NativeExecutionNode(Optional sourceLocation, PlanNodeId id, Optional statsEquivalentPlanNode, PlanNode subPlan) - { - super(sourceLocation, id, statsEquivalentPlanNode); - this.subPlan = requireNonNull(subPlan, "subPlan is null"); - } - - public NativeExecutionNode(PlanNode subPlan) - { - this(subPlan.getSourceLocation(), subPlan.getId(), subPlan.getStatsEquivalentPlanNode(), subPlan); - } - - /* - * Since NativeExecutionNode will hide its subPlan away from outside viewer, the getSources() intended to - * return an empty list to avoid any Vistor visiting the subPlan. - */ - @Override - public List getSources() - { - return Collections.emptyList(); - } - - @Override - public List getOutputVariables() - { - return subPlan.getOutputVariables(); - } - - @JsonProperty - public PlanNode getSubPlan() - { - return subPlan; - } - - @Override - public PlanNode replaceChildren(List newChildren) - { - throw new UnsupportedOperationException("replaceChildren is not supported by NativeExecutionNode"); - } - - @Override - public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalentPlanNode) - { - return new NativeExecutionNode(getSourceLocation(), getId(), statsEquivalentPlanNode, subPlan.assignStatsEquivalentPlanNode(statsEquivalentPlanNode)); - } - - @Override - public R accept(InternalPlanVisitor visitor, C context) - { - return visitor.visitNativeExecution(this, context); - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index e0c69c193f9a0..95d8784960fc2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -78,7 +78,6 @@ import com.facebook.presto.sql.planner.plan.LateralJoinNode; import com.facebook.presto.sql.planner.plan.MergeJoinNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; @@ -1223,15 +1222,6 @@ public Void visitLateralJoin(LateralJoinNode node, Void context) return processChildren(node, context); } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - // Do not add 'node' as it shares the ID with the root node of the sub-plan. - node.getSubPlan().accept(this, context); - - return null; - } - @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java index 0acd96251da57..51ac3a467ee37 100644 --- a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java @@ -49,7 +49,6 @@ import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.LateralJoinNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; @@ -328,13 +327,6 @@ public Void visitMarkDistinct(MarkDistinctNode node, Void context) return node.getSource().accept(this, context); } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - printNode(node, "NativeExecution", NODE_COLORS.get(NodeType.NATIVE_EXECUTION)); - return null; - } - @Override public Void visitWindow(WindowNode node, Void context) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index 2457f00d82b92..1abdbcb424ebe 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -69,7 +69,6 @@ import com.facebook.presto.sql.planner.plan.IndexSourceNode; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.LateralJoinNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.OffsetNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; @@ -934,11 +933,6 @@ public UnnestNode unnest(PlanNode source, List repl ordinalityVariable); } - public NativeExecutionNode nativeExecution(PlanNode subPlan) - { - return new NativeExecutionNode(subPlan); - } - public static Expression expression(String sql) { return ExpressionUtils.rewriteIdentifiersToSymbolReferences(new SqlParser().createExpression(sql)); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java index 1256fc941ae1c..6ada71d24df91 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java @@ -23,18 +23,13 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.TableHandle; -import com.facebook.presto.spi.plan.Assignments; -import com.facebook.presto.spi.plan.LimitNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; -import com.facebook.presto.spi.plan.ProjectNode; import com.facebook.presto.spi.plan.TableScanNode; -import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.planner.Partitioning; import com.facebook.presto.sql.planner.PartitioningScheme; import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.testing.TestingHandle; import com.facebook.presto.testing.TestingMetadata; @@ -191,38 +186,4 @@ public void testDomainTextFormatting() Domain.onlyNull(VARCHAR), "[NULL]"); } - - @Test - public void testPrintNativeExecutionNode() - { - ImmutableMap map = ImmutableMap.of( - COLUMN_VARIABLE, - COLUMN_VARIABLE); - TableScanNode scan = PLAN_BUILDER.tableScan( - TABLE_HANDLE_WITH_LAYOUT, - ImmutableList.of(COLUMN_VARIABLE), - ImmutableMap.of(COLUMN_VARIABLE, COLUMN_HANDLE)); - LimitNode limit = PLAN_BUILDER.limit(10, scan); - ProjectNode project = PLAN_BUILDER.project(new Assignments(map), limit); - NativeExecutionNode nativeExecution = PLAN_BUILDER.nativeExecution(project); - - PlanFragment testFragment = new PlanFragment( - new PlanFragmentId(0), - nativeExecution, - ImmutableSet.of(), - SOURCE_DISTRIBUTION, - ImmutableList.of(nativeExecution.getId()), - new PartitioningScheme(Partitioning.create(SOURCE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of()), - StageExecutionDescriptor.ungroupedExecution(), - false, - StatsAndCosts.empty(), - Optional.empty()); - - String textPlan = PlanPrinter.textPlanFragment(testFragment, FUNCTION_AND_TYPE_MANAGER, TEST_SESSION, true); - assertTrue(textPlan.contains("Project[projectLocality = UNKNOWN] => [column:varchar]")); - assertTrue(textPlan.contains("Limit[10] => [column:varchar]")); - assertTrue( - textPlan.matches( - "(?s).*TableScan\\[TableHandle \\{connectorId='testConnector',(?s).*\\[column:varchar\\](?s).*")); - } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java index a762e22e7bed1..0e84bcf23c1f6 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java @@ -22,7 +22,6 @@ import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.google.common.collect.ImmutableList; import java.util.Iterator; @@ -97,13 +96,6 @@ public Void visitTableScan(TableScanNode node, Void context) return null; } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - node.getSubPlan().accept(this, context); - return null; - } - @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java index a8b52683563b7..543755245cef5 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java @@ -113,8 +113,7 @@ public PrestoSparkTaskExecution( TaskExecutor taskExecutor, SplitMonitor splitMonitor, Executor notificationExecutor, - ScheduledExecutorService memoryUpdateExecutor, - boolean isNativeTask) + ScheduledExecutorService memoryUpdateExecutor) { this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null"); this.taskId = taskStateMachine.getTaskId(); @@ -149,7 +148,7 @@ public PrestoSparkTaskExecution( checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(tableScanSources), "Fragment is partitioned, but not all partitioned drivers were found"); - taskHandle = createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, isNativeTask); + taskHandle = createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor); requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null"); memoryUpdateExecutor.schedule(taskContext::updatePeakMemory, 1, SECONDS); @@ -160,15 +159,14 @@ private static TaskHandle createTaskHandle( TaskStateMachine taskStateMachine, TaskContext taskContext, LocalExecutionPlan localExecutionPlan, - TaskExecutor taskExecutor, - boolean isNativeTask) + TaskExecutor taskExecutor) { TaskHandle taskHandle = taskExecutor.addTask( taskStateMachine.getTaskId(), () -> 0, getInitialSplitsPerNode(taskContext.getSession()), getSplitConcurrencyAdjustmentInterval(taskContext.getSession()), - isNativeTask ? OptionalInt.of(MAX_JAVA_DRIVERS_FOR_NATIVE_TASK) : getMaxDriversPerTask(taskContext.getSession())); + getMaxDriversPerTask(taskContext.getSession())); taskStateMachine.addStateChangeListener(state -> { if (state.isDone()) { taskExecutor.removeTask(taskHandle); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index 38e5cb34ec938..0b344fcaabef9 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -19,12 +19,9 @@ import com.facebook.airlift.stats.TestingGcMonitor; import com.facebook.presto.Session; import com.facebook.presto.common.block.BlockEncodingManager; -import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.io.DataOutput; import com.facebook.presto.event.SplitMonitor; import com.facebook.presto.execution.ExecutionFailureInfo; -import com.facebook.presto.execution.Lifespan; -import com.facebook.presto.execution.Location; import com.facebook.presto.execution.MemoryRevokingSchedulerUtils; import com.facebook.presto.execution.ScheduledSplit; import com.facebook.presto.execution.StageExecutionId; @@ -45,9 +42,7 @@ import com.facebook.presto.memory.TraversingQueryContextVisitor; import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; import com.facebook.presto.metadata.FunctionAndTypeManager; -import com.facebook.presto.metadata.RemoteTransactionHandle; import com.facebook.presto.metadata.SessionPropertyManager; -import com.facebook.presto.metadata.Split; import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.OutputFactory; import com.facebook.presto.operator.TaskContext; @@ -61,9 +56,7 @@ import com.facebook.presto.spark.classloader_interface.MutablePartitionId; import com.facebook.presto.spark.classloader_interface.PrestoSparkJavaExecutionTaskInputs; import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow; -import com.facebook.presto.spark.classloader_interface.PrestoSparkNativeTaskInputs; import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage; -import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleReadDescriptor; import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats; import com.facebook.presto.spark.classloader_interface.PrestoSparkStorageHandle; import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs; @@ -75,14 +68,10 @@ import com.facebook.presto.spark.execution.PrestoSparkRowBatch.RowTupleSupplier; import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator.PreDeterminedPartitionFunction; import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory; -import com.facebook.presto.spark.execution.operator.NativeExecutionOperator; -import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.page.PageDataOutput; -import com.facebook.presto.spi.plan.OutputNode; -import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.security.TokenAuthenticator; import com.facebook.presto.spi.storage.TempDataOperationContext; @@ -91,15 +80,12 @@ import com.facebook.presto.spi.storage.TempStorageHandle; import com.facebook.presto.spiller.NodeSpillConfig; import com.facebook.presto.spiller.SpillSpaceTracker; -import com.facebook.presto.split.RemoteSplit; import com.facebook.presto.sql.planner.LocalExecutionPlanner; import com.facebook.presto.sql.planner.LocalExecutionPlanner.LocalExecutionPlan; import com.facebook.presto.sql.planner.OutputPartitioning; import com.facebook.presto.sql.planner.PlanFragment; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; -import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.planPrinter.PlanPrinter; import com.facebook.presto.storage.TempStorageManager; import com.google.common.collect.ImmutableList; @@ -122,7 +108,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalInt; @@ -135,7 +120,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import java.util.zip.CRC32; import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalTotalMemoryLimit; @@ -146,14 +130,12 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxRevocableMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled; -import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.execution.TaskState.FAILED; import static com.facebook.presto.execution.TaskStatus.STARTING_VERSION; import static com.facebook.presto.execution.buffer.BufferState.FINISHED; import static com.facebook.presto.metadata.MetadataUpdates.DEFAULT_METADATA_UPDATES; -import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMemoryRevokingTarget; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMemoryRevokingThreshold; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getShuffleOutputTargetAverageRowSize; @@ -166,7 +148,6 @@ import static com.facebook.presto.spark.util.PrestoSparkUtils.toPrestoSparkSerializedPage; import static com.facebook.presto.spi.ErrorCause.UNKNOWN; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; -import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.facebook.presto.util.Failures.toFailures; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -220,11 +201,6 @@ public class PrestoSparkTaskExecutorFactory private final AtomicBoolean memoryRevokePending = new AtomicBoolean(); private final AtomicBoolean memoryRevokeRequestInProgress = new AtomicBoolean(); - private final BlockEncodingSerde blockEncodingSerde; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - private final PrestoSparkShuffleInfoTranslator shuffleInfoTranslator; - @Inject public PrestoSparkTaskExecutorFactory( SessionPropertyManager sessionPropertyManager, @@ -248,11 +224,7 @@ public PrestoSparkTaskExecutorFactory( NodeSpillConfig nodeSpillConfig, TempStorageManager tempStorageManager, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, - PrestoSparkConfig prestoSparkConfig, - BlockEncodingSerde blockEncodingSerde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) + PrestoSparkConfig prestoSparkConfig) { this( sessionPropertyManager, @@ -280,11 +252,7 @@ public PrestoSparkTaskExecutorFactory( requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskAllocationTrackingEnabled(), tempStorageManager, requireNonNull(prestoSparkConfig, "prestoSparkConfig is null").getStorageBasedBroadcastJoinStorage(), - prestoSparkBroadcastTableCacheManager, - blockEncodingSerde, - processFactory, - taskFactory, - shuffleInfoTranslator); + prestoSparkBroadcastTableCacheManager); } public PrestoSparkTaskExecutorFactory( @@ -313,11 +281,7 @@ public PrestoSparkTaskExecutorFactory( boolean allocationTrackingEnabled, TempStorageManager tempStorageManager, String storageBasedBroadcastJoinStorage, - PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, - BlockEncodingSerde blockEncodingSerde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) + PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager) { this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); this.blockEncodingManager = requireNonNull(blockEncodingManager, "blockEncodingManager is null"); @@ -346,10 +310,6 @@ public PrestoSparkTaskExecutorFactory( this.tempStorageManager = requireNonNull(tempStorageManager, "tempStorageManager is null"); this.storageBasedBroadcastJoinStorage = requireNonNull(storageBasedBroadcastJoinStorage, "storageBasedBroadcastJoinStorage is null"); this.prestoSparkBroadcastTableCacheManager = requireNonNull(prestoSparkBroadcastTableCacheManager, "prestoSparkBroadcastTableCacheManager is null"); - this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - this.shuffleInfoTranslator = requireNonNull(shuffleInfoTranslator, "shuffleInfoFactory is null"); } @Override @@ -406,18 +366,7 @@ public IPrestoSparkTaskExecutor doCreate( TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId, attemptNumber); - // TODO: Remove this once we can display the plan on Spark UI. - // Currently, `textPlanFragment` throws an exception if json-based UDFs are used in the query, which can only - // happen in native execution mode. To resolve this error, `JsonFileBasedFunctionNamespaceManager` must be - // loaded on the executors as well (which is actually not required for native execution). To do so, we need a - // mechanism to ship the JSON file containing the UDF metadata to workers, which does not exist as of today. - // TODO: Address this issue; more details in https://github.com/prestodb/presto/issues/19600 - if (isNativeExecutionEnabled(session)) { - log.info("Logging plan fragment is not supported for presto-on-spark native execution, yet"); - } - else { - log.info(PlanPrinter.textPlanFragment(fragment, functionAndTypeManager, session, true)); - } + log.info(PlanPrinter.textPlanFragment(fragment, functionAndTypeManager, session, true)); DataSize maxUserMemory = getQueryMaxMemoryPerNode(session); DataSize maxTotalMemory = getQueryMaxTotalMemoryPerNode(session); @@ -529,24 +478,12 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem List taskSources; Optional shuffleWriteInfo = Optional.empty(); - if (isNativeExecutionEnabled(session) && fragment.getRoot() instanceof NativeExecutionNode) { - checkArgument( - inputs instanceof PrestoSparkNativeTaskInputs, - format("PrestoSparkNativeTaskInputs is required for native execution, but %s is provided", inputs.getClass().getName())); - PrestoSparkNativeTaskInputs nativeInputs = (PrestoSparkNativeTaskInputs) inputs; - fillNativeExecutionTaskInputs(fragment, session, nativeInputs, shuffleReadInfos); - shuffleWriteInfo = needShuffleWriteInfo(nativeInputs, (NativeExecutionNode) fragment.getRoot()) ? - Optional.of(shuffleInfoTranslator.createShuffleWriteInfo(session, nativeInputs.getShuffleWriteDescriptor().get())) : Optional.empty(); - taskSources = getNativeExecutionShuffleSources(session, taskId, fragment, shuffleReadInfos.build(), getTaskSources(serializedTaskSources)); - } - else { - checkArgument( - inputs instanceof PrestoSparkJavaExecutionTaskInputs, - format("PrestoSparkJavaExecutionTaskInputs is required for java execution, but %s is provided", inputs.getClass().getName())); - PrestoSparkJavaExecutionTaskInputs taskInputs = (PrestoSparkJavaExecutionTaskInputs) inputs; - fillJavaExecutionTaskInputs(fragment, taskInputs, shuffleInputs, pageInputs, broadcastInputs); - taskSources = getTaskSources(serializedTaskSources); - } + checkArgument( + inputs instanceof PrestoSparkJavaExecutionTaskInputs, + format("PrestoSparkJavaExecutionTaskInputs is required for java execution, but %s is provided", inputs.getClass().getName())); + PrestoSparkJavaExecutionTaskInputs taskInputs = (PrestoSparkJavaExecutionTaskInputs) inputs; + fillJavaExecutionTaskInputs(fragment, taskInputs, shuffleInputs, pageInputs, broadcastInputs); + taskSources = getTaskSources(serializedTaskSources); OutputBufferMemoryManager memoryManager = new OutputBufferMemoryManager( sinkMaxBufferSize.toBytes(), @@ -600,13 +537,7 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem stageId), taskDescriptor.getTableWriteInfo(), true, - ImmutableList.of(new NativeExecutionOperator.NativeExecutionOperatorTranslator( - session, - fragment, - blockEncodingSerde, - processFactory, - taskFactory, - shuffleWriteInfo.map(shuffleInfoTranslator::createSerializedWriteInfo)))); + ImmutableList.of()); taskStateMachine.addStateChangeListener(state -> { if (state.isDone()) { @@ -621,8 +552,7 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem taskExecutor, splitMonitor, notificationExecutor, - memoryUpdateExecutor, - isNativeExecutionEnabled(session) && fragment.getRoot() instanceof NativeExecutionNode); + memoryUpdateExecutor); log.info("Task [%s] received %d splits.", taskId, @@ -688,27 +618,6 @@ private static OptionalLong computeAllSplitsSize(List taskSources) return OptionalLong.of(sum); } - private boolean needShuffleWriteInfo(PrestoSparkNativeTaskInputs nativeInputs, NativeExecutionNode node) - { - return nativeInputs.getShuffleWriteDescriptor().isPresent() && !findTableWriteNode(node).isPresent() && !(node.getSubPlan() instanceof OutputNode); - } - - private void fillNativeExecutionTaskInputs( - PlanFragment fragment, - Session session, - PrestoSparkNativeTaskInputs inputs, - ImmutableMap.Builder shuffleReadInfos) - { - for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) { - for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) { - PrestoSparkShuffleReadDescriptor shuffleReadDescriptor = inputs.getShuffleReadDescriptors().get(sourceFragmentId.toString()); - if (shuffleReadDescriptor != null) { - shuffleReadInfos.put(remoteSource.getId(), shuffleInfoTranslator.createShuffleReadInfo(session, shuffleReadDescriptor)); - } - } - } - } - private void fillJavaExecutionTaskInputs( PlanFragment fragment, PrestoSparkJavaExecutionTaskInputs inputs, @@ -778,48 +687,6 @@ private List getTaskSources(Iterator getNativeExecutionShuffleSources( - Session session, TaskId taskId, PlanFragment fragment, Map shuffleReadInfos, List taskSources) - { - ImmutableSet.Builder result = ImmutableSet.builder(); - PlanNode root = fragment.getRoot(); - AtomicLong nextSplitId = new AtomicLong(); - taskSources.stream() - .flatMap(source -> source.getSplits().stream()) - .mapToLong(split -> split.getSequenceId()) - .max() - .ifPresent(id -> nextSplitId.set(id + 1)); - shuffleReadInfos.forEach((planNodeId, info) -> - result.add(new ScheduledSplit(nextSplitId.getAndIncrement(), planNodeId, new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit( - new Location(format("batch://%s?shuffleInfo=%s", taskId, shuffleInfoTranslator.createSerializedReadInfo(info))), - taskId))))); - - List nativeExecutionSources = taskSources.stream().filter(taskSource -> taskSource.getPlanNodeId().equals(root)).collect(Collectors.toList()); - checkState(nativeExecutionSources.size() <= 1, "At most 1 taskSource is expected for NativeExecutionNode but got %s", nativeExecutionSources.size()); - if (!nativeExecutionSources.isEmpty()) { - // Append the shuffle splits with original splits - TaskSource nativeExecutionSource = nativeExecutionSources.get(0); - result.addAll(nativeExecutionSource.getSplits()); - } - - TaskSource newTaskSource = new TaskSource(root.getId(), result.build(), ImmutableSet.of(Lifespan.taskWide()), true); - ImmutableList.Builder newTaskSources = ImmutableList.builder(); - // Combine the shuffle read taskSource and original sources - newTaskSources.add(newTaskSource) - .addAll(taskSources.stream() - .filter(taskSource -> !taskSource.getPlanNodeId().equals(root)) - .collect(Collectors.toList())); - return newTaskSources.build(); - } - - private Optional findTableWriteNode(PlanNode node) - { - PlanNode root = node instanceof NativeExecutionNode ? ((NativeExecutionNode) node).getSubPlan() : node; - return searchFrom(root) - .where(TableWriterNode.class::isInstance) - .findFirst(); - } - @SuppressWarnings("unchecked") private static Output configureOutput( Class outputType, diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java deleted file mode 100644 index f0f4fbb8ce2e1..0000000000000 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.spark.execution.operator; - -import com.facebook.airlift.log.Logger; -import com.facebook.presto.Session; -import com.facebook.presto.common.Page; -import com.facebook.presto.common.block.BlockEncodingSerde; -import com.facebook.presto.execution.ExecutionFailureInfo; -import com.facebook.presto.execution.ScheduledSplit; -import com.facebook.presto.execution.TaskInfo; -import com.facebook.presto.execution.TaskSource; -import com.facebook.presto.execution.TaskState; -import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.PagesSerdeFactory; -import com.facebook.presto.execution.scheduler.TableWriteInfo; -import com.facebook.presto.memory.context.LocalMemoryContext; -import com.facebook.presto.operator.DriverContext; -import com.facebook.presto.operator.NativeExecutionInfo; -import com.facebook.presto.operator.OperatorContext; -import com.facebook.presto.operator.OperatorFactory; -import com.facebook.presto.operator.SourceOperator; -import com.facebook.presto.operator.SourceOperatorFactory; -import com.facebook.presto.spark.execution.NativeExecutionProcess; -import com.facebook.presto.spark.execution.NativeExecutionProcessFactory; -import com.facebook.presto.spark.execution.NativeExecutionTask; -import com.facebook.presto.spark.execution.NativeExecutionTaskFactory; -import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.spi.UpdatablePageSource; -import com.facebook.presto.spi.page.PagesSerde; -import com.facebook.presto.spi.page.SerializedPage; -import com.facebook.presto.spi.plan.PlanNode; -import com.facebook.presto.spi.plan.PlanNodeId; -import com.facebook.presto.sql.planner.LocalExecutionPlanner; -import com.facebook.presto.sql.planner.PlanFragment; -import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; -import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled; -import static com.facebook.presto.SystemSessionProperties.isExchangeCompressionEnabled; -import static com.facebook.presto.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION; -import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; -import static com.facebook.presto.sql.planner.SchedulingOrderVisitor.scheduleOrder; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -/** - * NativeExecutionOperator is responsible for launching the external native process and managing the communication - * between Java process and native process (by using the {@Link NativeExecutionTask}). The NativeExecutionOperator will send necessary meta information - * (e.g, plan fragment, session properties etc.) will be sent to native process and collect the execution results (data, metrics etc) back and propagate out as - * the operator output through the operator's getOutput method. - * The lifecycle of the NativeExecutionOperator is: - * 1. Launch the native engine external process when initializing the operator. - * 2. Serialize and pass the planFragment, tableWriteInfo and taskSource to the external process through {@link NativeExecutionTask} APIs. - * 3. Call {@link NativeExecutionTask}'s pollResult() to retrieve {@link SerializedPage} back from external process. - * 4. Deserialize {@link SerializedPage} to {@link Page} and return it back to driver from the getOutput method. - * 5. The close() will be called by the driver when {@link NativeExecutionTask} completes and pollResult() returns an empty result. - * 6. Shut down the external process upon calling of close() method - *

- */ -public class NativeExecutionOperator - implements SourceOperator -{ - private static final Logger log = Logger.get(NativeExecutionOperator.class); - private static final String NATIVE_EXECUTION_SERVER_URI = "http://127.0.0.1"; - - private final PlanNodeId sourceId; - private final OperatorContext operatorContext; - private final LocalMemoryContext systemMemoryContext; - private final PlanFragment planFragment; - private final TableWriteInfo tableWriteInfo; - private final Optional shuffleWriteInfo; - private final PagesSerde serde; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - - private NativeExecutionProcess process; - private NativeExecutionTask task; - private CompletableFuture taskStatusFuture; - private List taskSource = new ArrayList<>(); - private Map> splits = new HashMap<>(); - private boolean finished; - - private final AtomicReference info = new AtomicReference<>(null); - - public NativeExecutionOperator( - PlanNodeId sourceId, - OperatorContext operatorContext, - PlanFragment planFragment, - TableWriteInfo tableWriteInfo, - PagesSerde serde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - Optional shuffleWriteInfo) - { - this.sourceId = requireNonNull(sourceId, "sourceId is null"); - this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); - this.systemMemoryContext = operatorContext.localSystemMemoryContext(); - this.planFragment = requireNonNull(planFragment, "planFragment is null"); - this.tableWriteInfo = requireNonNull(tableWriteInfo, "tableWriteInfo is null"); - this.shuffleWriteInfo = requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null"); - this.serde = requireNonNull(serde, "serde is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - - operatorContext.setInfoSupplier(info::get); - } - - @Override - public OperatorContext getOperatorContext() - { - return operatorContext; - } - - @Override - public boolean needsInput() - { - return false; - } - - @Override - public void addInput(Page page) - { - throw new UnsupportedOperationException(); - } - - /** - * The overall workflow of the getOutput method is: - * 1. Submit the plan to the external process - * 2. Call pollResult method to get latest buffered result. - * 3. Call getTaskInfo method to get the TaskInfo and propagate it - * 4. Deserialize the polled {@link SerializedPage} to {@link Page} and return it back - */ - @Override - public Page getOutput() - { - if (finished) { - return null; - } - - if (process == null) { - createProcess(); - checkState(process != null, "process is null"); - createTask(); - checkState(task != null, "task is null"); - TaskInfo taskInfo = task.start(); - if (processTaskInfo(taskInfo)) { - return null; - } - } - - try { - Optional page = task.pollResult(); - if (page.isPresent()) { - return processResult(page.get()); - } - - Optional taskInfo = task.getTaskInfo(); - if (taskInfo.isPresent() && processTaskInfo(taskInfo.get())) { - return null; - } - - return null; - } - catch (InterruptedException | RuntimeException e) { - String error = e.getMessage(); - if (!process.isAlive()) { - error = String.format("Native process has crashed. %s", e.getMessage()); - } - log.error(e); - throw new PrestoException(GENERIC_INTERNAL_ERROR, error, e); - } - } - - private boolean processTaskInfo(TaskInfo taskInfo) - { - TaskStatus taskStatus = taskInfo.getTaskStatus(); - if (!taskStatus.getState().isDone()) { - return false; - } - - if (taskStatus.getState() != TaskState.FINISHED) { - RuntimeException failure = taskStatus.getFailures().stream() - .findFirst() - .map(ExecutionFailureInfo::toException) - .orElse(new PrestoException(GENERIC_INTERNAL_ERROR, "Native task failed for an unknown reason")); - throw failure; - } - - info.set(new NativeExecutionInfo(ImmutableList.of(taskInfo.getStats()))); - finished = true; - return true; - } - - private void createProcess() - { - try { - this.process = processFactory.getNativeExecutionProcess( - operatorContext.getSession(), - URI.create(NATIVE_EXECUTION_SERVER_URI)); - log.info("Starting native execution process of task" + getOperatorContext().getDriverContext().getTaskId().toString()); - process.start(); - } - catch (ExecutionException | InterruptedException | IOException e) { - throw new RuntimeException(e); - } - } - - private void createTask() - { - checkState(taskSource != null, "taskSource is null"); - checkState(taskStatusFuture == null, "taskStatusFuture has already been set"); - checkState(task == null, "task has already been set"); - checkState(process != null, "process is null"); - this.task = taskFactory.createNativeExecutionTask( - operatorContext.getSession(), - uriBuilderFrom(URI.create(NATIVE_EXECUTION_SERVER_URI)).port(process.getPort()).build(), - operatorContext.getDriverContext().getTaskId(), - planFragment, - ImmutableList.copyOf(taskSource), - tableWriteInfo, - shuffleWriteInfo); - } - - private Page processResult(SerializedPage page) - { - operatorContext.recordRawInput(page.getSizeInBytes(), page.getPositionCount()); - Page deserializedPage = serde.deserialize(page); - operatorContext.recordProcessedInput(deserializedPage.getSizeInBytes(), page.getPositionCount()); - return deserializedPage; - } - - @Override - public void finish() {} - - @Override - public boolean isFinished() - { - return finished; - } - - @Override - public PlanNodeId getSourceId() - { - return sourceId; - } - - @Override - public Supplier> addSplit(ScheduledSplit split) - { - requireNonNull(split, "split is null"); - - if (finished) { - return Optional::empty; - } - splits.computeIfAbsent(split.getPlanNodeId(), key -> new ArrayList<>()).add(split); - - return Optional::empty; - } - - @Override - public void noMoreSplits() - { - // all splits belonging to a single planNodeId should be within a single taskSource - splits.forEach((planNodeId, split) -> taskSource.add(new TaskSource(planNodeId, ImmutableSet.copyOf(split), true))); - - // When joining bucketed table with a non-bucketed table with a filter on "$bucket", - // some tasks may not have splits for the bucketed table. In this case we still need - // to send no-more-splits message to Velox. - Set tableScanIds = Sets.newHashSet(scheduleOrder(planFragment.getRoot())); - tableScanIds.stream() - .filter(id -> !splits.containsKey(id)) - .forEach(id -> taskSource.add(new TaskSource(id, ImmutableSet.of(), true))); - } - - @Override - public void close() - { - systemMemoryContext.setBytes(0); - if (task != null) { - task.stop(); - } - } - - public static class NativeExecutionOperatorFactory - implements SourceOperatorFactory - { - private final int operatorId; - private final PlanNodeId planNodeId; - private final PlanFragment planFragment; - private final TableWriteInfo tableWriteInfo; - private final Optional shuffleWriteInfo; - private final PagesSerdeFactory serdeFactory; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - private boolean closed; - - public NativeExecutionOperatorFactory( - int operatorId, - PlanNodeId planNodeId, - PlanFragment planFragment, - TableWriteInfo tableWriteInfo, - PagesSerdeFactory serdeFactory, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - Optional shuffleWriteInfo) - { - this.operatorId = operatorId; - this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); - this.planFragment = requireNonNull(planFragment, "planFragment is null"); - this.tableWriteInfo = requireNonNull(tableWriteInfo, "tableWriteInfo is null"); - this.shuffleWriteInfo = requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null"); - this.serdeFactory = requireNonNull(serdeFactory, "serdeFactory is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - } - - @Override - public PlanNodeId getSourceId() - { - return planNodeId; - } - - @Override - public SourceOperator createOperator(DriverContext driverContext) - { - checkState(!closed, "operator factory is closed"); - OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, NativeExecutionOperator.class.getSimpleName()); - return new NativeExecutionOperator( - planNodeId, - operatorContext, - planFragment, - tableWriteInfo, - serdeFactory.createPagesSerde(), - processFactory, - taskFactory, - shuffleWriteInfo); - } - - @Override - public void noMoreOperators() - { - closed = true; - } - - public PlanFragment getPlanFragment() - { - return planFragment; - } - } - - public static class NativeExecutionOperatorTranslator - extends LocalExecutionPlanner.CustomPlanTranslator - { - private final PlanFragment fragment; - private final Session session; - private final Optional shuffleWriteInfo; - private final BlockEncodingSerde blockEncodingSerde; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - - public NativeExecutionOperatorTranslator( - Session session, - PlanFragment fragment, - BlockEncodingSerde blockEncodingSerde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - Optional shuffleWriteInfo) - { - this.fragment = requireNonNull(fragment, "fragment is null"); - this.session = requireNonNull(session, "session is null"); - this.shuffleWriteInfo = requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null"); - this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - } - - @Override - public Optional translate( - PlanNode node, - LocalExecutionPlanner.LocalExecutionPlanContext context, - InternalPlanVisitor visitor) - { - if (node instanceof NativeExecutionNode) { - OperatorFactory operatorFactory = new NativeExecutionOperator.NativeExecutionOperatorFactory( - context.getNextOperatorId(), - node.getId(), - fragment.withSubPlan(((NativeExecutionNode) node).getSubPlan()), - context.getTableWriteInfo(), - new PagesSerdeFactory(blockEncodingSerde, isExchangeCompressionEnabled(session), isExchangeChecksumEnabled(session)), - processFactory, - taskFactory, - shuffleWriteInfo); - return Optional.of( - new LocalExecutionPlanner.PhysicalOperation(operatorFactory, makeLayout(node), context, UNGROUPED_EXECUTION)); - } - return Optional.empty(); - } - } -} diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java index 5598efa067810..2605ce57ef767 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java @@ -45,7 +45,6 @@ import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.sql.planner.SplitSourceFactory; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.google.common.collect.ArrayListMultimap; @@ -364,24 +363,7 @@ private ListMultimap createTaskSources return result; } - /** - * If native execution is enabled, the task sources need to be associated with {@link NativeExecutionNode} rather than the original {@link TableScanNode} to allow the - * driver dispatching the task sources to the native process. To achieve that, in this method, we'll store the PlanNodeId from {@link NativeExecutionNode} - * as task source id when we encountered the {@link NativeExecutionNode},otherwise the PlanNodeId from {@link TableScanNode} will be used as the task source id. - */ private static List findTableScanNodes(PlanNode node) - { - return node instanceof NativeExecutionNode ? findTableScanNodesInternal((NativeExecutionNode) node) : findTableScanNodesInternal(node); - } - - private static List findTableScanNodesInternal(NativeExecutionNode node) - { - return searchFrom(node.getSubPlan()) - .where(TableScanNode.class::isInstance) - .findAll().stream().map(t -> new PrestoSparkSource(node.getId(), t)).collect(Collectors.toList()); - } - - private static List findTableScanNodesInternal(PlanNode node) { return searchFrom(node) .where(TableScanNode.class::isInstance) diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java index 1c0d942f2e70e..1c394d451a6b4 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java @@ -113,23 +113,17 @@ public void tearDown() taskNotificationExecutor.shutdown(); } - @Test - public void testNativeDriverInstanceCount() - { - testDriverCount(nativeTestSession, true, 1); - } - @Test public void testJavaDriverInstanceCount() { - testDriverCount(nonNativeTestSession, false, 3); + testDriverCount(nonNativeTestSession, 3); } - private void testDriverCount(Session session, boolean isNative, int expectedDriverCount) + private void testDriverCount(Session session, int expectedDriverCount) { TaskContext taskContext = TestingTaskContext.createTaskContext(taskNotificationExecutor, scheduledExecutor, session, new DataSize(2, GIGABYTE)); taskExecutor.start(); - PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, TaskTestUtils.createTestSplitMonitor(), taskNotificationExecutor, scheduledExecutor, isNative); + PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, TaskTestUtils.createTestSplitMonitor(), taskNotificationExecutor, scheduledExecutor); taskExecution.start(ImmutableList.of(new TaskSource(TABLE_SCAN_NODE_ID, splits, true))); assertEquals(taskContext.getPipelineContexts().get(0).getPipelineStats().getDrivers().size(), expectedDriverCount); } From 1186e5e05cb5dcbd366210ff8b297b54b410d22f Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Thu, 8 Jun 2023 11:50:17 -0700 Subject: [PATCH 5/6] [native pos] Remove NativeExecutionInfo special handling We no longer use NativeExecutionInfo as we have all the task stats captured correctly as part of TaskInfo objects returned from Cpp process. This commit removes this class --- .../presto/execution/StageExecutionInfo.java | 17 +---- .../presto/operator/NativeExecutionInfo.java | 63 ------------------- .../presto/operator/OperatorInfo.java | 1 - .../planPrinter/PlanNodeStatsSummarizer.java | 7 --- 4 files changed, 1 insertion(+), 87 deletions(-) delete mode 100644 presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java b/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java index ad38d00d03b3e..f59ceabb7b4a8 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java @@ -16,7 +16,6 @@ import com.facebook.airlift.stats.Distribution.DistributionSnapshot; import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.operator.BlockedReason; -import com.facebook.presto.operator.NativeExecutionInfo; import com.facebook.presto.operator.OperatorStats; import com.facebook.presto.operator.PipelineStats; import com.facebook.presto.operator.TaskStats; @@ -131,21 +130,7 @@ public static StageExecutionInfo create( } TaskStats taskStats = taskInfo.getStats(); - - boolean isNativeTask = false; - for (PipelineStats pipeline : taskStats.getPipelines()) { - for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) { - if (operatorStats.getInfo() instanceof NativeExecutionInfo) { - isNativeTask = true; - allTaskStats.addAll(((NativeExecutionInfo) operatorStats.getInfo()).getTaskStats()); - } - } - } - - // Prefer statistics from the native process. - if (!isNativeTask) { - allTaskStats.add(taskStats); - } + allTaskStats.add(taskStats); if (state == FINISHED && taskInfo.getTaskStatus().getState() == TaskState.FAILED) { retriedCpuTime += taskStats.getTotalCpuTimeInNanos(); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java b/presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java deleted file mode 100644 index e30c23fa5f969..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.operator; - -import com.facebook.drift.annotations.ThriftConstructor; -import com.facebook.drift.annotations.ThriftField; -import com.facebook.drift.annotations.ThriftStruct; -import com.facebook.presto.util.Mergeable; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; - -import java.util.List; - -import static java.util.Objects.requireNonNull; - -@ThriftStruct -public class NativeExecutionInfo - implements Mergeable, OperatorInfo -{ - /// Runtime statistics received from native process. - private final List taskStats; - - @JsonCreator - @ThriftConstructor - public NativeExecutionInfo(@JsonProperty("taskStats") List taskStats) - { - this.taskStats = requireNonNull(taskStats); - } - - @JsonProperty - @ThriftField(1) - public List getTaskStats() - { - return taskStats; - } - - @Override - public NativeExecutionInfo mergeWith(NativeExecutionInfo other) - { - return new NativeExecutionInfo(new ImmutableList.Builder() - .addAll(taskStats) - .addAll(other.taskStats) - .build()); - } - - @Override - public boolean isFinal() - { - return true; - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java index e516cef235700..d377ea874876a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java @@ -26,7 +26,6 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = ExchangeClientStatus.class, name = "exchangeClientStatus"), @JsonSubTypes.Type(value = LocalExchangeBufferInfo.class, name = "localExchangeBuffer"), - @JsonSubTypes.Type(value = NativeExecutionInfo.class, name = "NativeExecutionInfo"), @JsonSubTypes.Type(value = TableFinishInfo.class, name = "tableFinish"), @JsonSubTypes.Type(value = SplitOperatorInfo.class, name = "splitOperator"), @JsonSubTypes.Type(value = HashCollisionsInfo.class, name = "hashCollisionsInfo"), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java index cd5fb5e67a89c..515e9bdfea129 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java @@ -16,7 +16,6 @@ import com.facebook.presto.execution.StageInfo; import com.facebook.presto.execution.TaskInfo; import com.facebook.presto.operator.HashCollisionsInfo; -import com.facebook.presto.operator.NativeExecutionInfo; import com.facebook.presto.operator.OperatorStats; import com.facebook.presto.operator.PipelineStats; import com.facebook.presto.operator.TaskStats; @@ -144,12 +143,6 @@ private static List getPlanNodeStats(TaskStats taskStats) windowNodeStats.merge(planNodeId, WindowOperatorStats.create(windowInfo), (left, right) -> left.mergeWith(right)); } - if (operatorStats.getInfo() instanceof NativeExecutionInfo) { - NativeExecutionInfo info = (NativeExecutionInfo) operatorStats.getInfo(); - nativeTaskStats.addAll(info.getTaskStats()); - nativePlanNodeIds.add(planNodeId); - } - planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); From b5cf2197e3821c6dabeafa33f558ffcd5db33ddf Mon Sep 17 00:00:00 2001 From: Shrinidhi Joshi Date: Thu, 8 Jun 2023 16:10:22 -0700 Subject: [PATCH 6/6] [native pos] Add close method to IPrestoSparkTaskExecutorFactory This helps provide a hook for PrestoSparkNativeTaskExecutoryFactory to shutdown the native process --- .../com/facebook/presto/spark/PrestoSparkService.java | 2 ++ .../execution/PrestoSparkNativeTaskExecutorFactory.java | 8 ++++++++ .../spark/execution/PrestoSparkTaskExecutorFactory.java | 3 +++ .../IPrestoSparkTaskExecutorFactory.java | 2 ++ 4 files changed, 15 insertions(+) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java index 982acb8693bbe..1de2e6172235f 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java @@ -67,5 +67,7 @@ public IPrestoSparkTaskExecutorFactory getNativeTaskExecutorFactory() public void close() { lifeCycleManager.stop(); + prestoSparkNativeTaskExecutorFactory.close(); + taskExecutorFactory.close(); } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java index 6da3d2abd1fce..bdd9a2defcbbb 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java @@ -260,6 +260,14 @@ public IPrestoSparkTaskExecutor doCreate( return new PrestoSparkNativeTaskOutputIterator<>(task, outputType, taskInfoCollector, taskInfoCodec, executionExceptionFactory); } + @Override + public void close() + { + if (nativeExecutionProcess != null) { + nativeExecutionProcess.close(); + } + } + private static void completeTask(CollectionAccumulator taskInfoCollector, NativeExecutionTask task, Codec taskInfoCodec) { // stop the task diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index 0b344fcaabef9..d9fd85aeb5c87 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -339,6 +339,9 @@ public IPrestoSparkTaskExecutor create( } } + @Override + public void close() {} + public IPrestoSparkTaskExecutor doCreate( int partitionId, int attemptNumber, diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java index 7b67f86fd76e1..20aff04ff7b32 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java @@ -27,4 +27,6 @@ IPrestoSparkTaskExecutor create( CollectionAccumulator taskInfoCollector, CollectionAccumulator shuffleStatsCollector, Class outputType); + + public void close(); }