From 44e6fd1f6f72b1926b23e47fe1714a3c86aabf29 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Fri, 22 Apr 2022 16:58:12 -0400 Subject: [PATCH 1/4] Expose fault tolerant execution statistics via JMX --- .../io/trino/execution/SqlQueryExecution.java | 9 + .../FaultTolerantStageScheduler.java | 4 + .../scheduler/SqlQueryScheduler.java | 10 +- .../scheduler/TaskExecutionStats.java | 209 ++++++++++++++++++ .../io/trino/server/CoordinatorModule.java | 4 + .../TestFaultTolerantStageScheduler.java | 1 + 6 files changed, 235 insertions(+), 2 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/TaskExecutionStats.java diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index d7231a11943f..85b936124c38 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -31,6 +31,7 @@ import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.SqlQueryScheduler; import io.trino.execution.scheduler.TaskDescriptorStorage; +import io.trino.execution.scheduler.TaskExecutionStats; import io.trino.execution.scheduler.TaskSourceFactory; import io.trino.execution.scheduler.policy.ExecutionPolicy; import io.trino.execution.warnings.WarningCollector; @@ -103,6 +104,7 @@ public class SqlQueryExecution private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; + private final TaskExecutionStats taskExecutionStats; private final List planOptimizers; private final PlanFragmenter planFragmenter; private final RemoteTaskFactory remoteTaskFactory; @@ -138,6 +140,7 @@ private SqlQueryExecution( NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, + TaskExecutionStats taskExecutionStats, List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, @@ -167,6 +170,7 @@ private SqlQueryExecution( this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); + this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null"); @@ -507,6 +511,7 @@ private void planDistribution(PlanRoot plan) nodeScheduler, nodeAllocatorService, partitionMemoryEstimatorFactory, + taskExecutionStats, remoteTaskFactory, plan.isSummarizeTaskInfos(), scheduleSplitBatchSize, @@ -710,6 +715,7 @@ public static class SqlQueryExecutionFactory private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; + private final TaskExecutionStats taskExecutionStats; private final List planOptimizers; private final PlanFragmenter planFragmenter; private final RemoteTaskFactory remoteTaskFactory; @@ -738,6 +744,7 @@ public static class SqlQueryExecutionFactory NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, + TaskExecutionStats taskExecutionStats, PlanOptimizersFactory planOptimizersFactory, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, @@ -767,6 +774,7 @@ public static class SqlQueryExecutionFactory this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); + this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"); this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null"); @@ -808,6 +816,7 @@ public QueryExecution createQueryExecution( nodeScheduler, nodeAllocatorService, partitionMemoryEstimatorFactory, + taskExecutionStats, planOptimizers, planFragmenter, remoteTaskFactory, diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index f57dd8b1f7b1..9d5b17cc0cd4 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -112,6 +112,7 @@ public class FaultTolerantStageScheduler private final NodeAllocator nodeAllocator; private final TaskDescriptorStorage taskDescriptorStorage; private final PartitionMemoryEstimator partitionMemoryEstimator; + private final TaskExecutionStats taskExecutionStats; private final int maxRetryAttemptsPerTask; private final int maxTasksWaitingForNodePerStage; @@ -181,6 +182,7 @@ public FaultTolerantStageScheduler( NodeAllocator nodeAllocator, TaskDescriptorStorage taskDescriptorStorage, PartitionMemoryEstimator partitionMemoryEstimator, + TaskExecutionStats taskExecutionStats, TaskLifecycleListener taskLifecycleListener, DelayedFutureCompletor futureCompletor, Ticker ticker, @@ -202,6 +204,7 @@ public FaultTolerantStageScheduler( this.nodeAllocator = requireNonNull(nodeAllocator, "nodeAllocator is null"); this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null"); this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); + this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.taskLifecycleListener = requireNonNull(taskLifecycleListener, "taskLifecycleListener is null"); this.futureCompletor = requireNonNull(futureCompletor, "futureCompletor is null"); this.sinkExchange = requireNonNull(sinkExchange, "sinkExchange is null"); @@ -410,6 +413,7 @@ private void startTask(int partition, NodeAllocator.NodeLease nodeLease) taskLifecycleListener.taskCreated(stage.getFragment().getId(), task); task.addStateChangeListener(taskStatus -> updateTaskStatus(taskStatus, exchangeSinkInstanceHandle)); + task.addFinalTaskInfoListener(taskExecutionStats::update); task.start(); } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index a216a4cd6a03..b9dd5d8d8953 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -181,6 +181,7 @@ public class SqlQueryScheduler private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; + private final TaskExecutionStats taskExecutionStats; private final int splitBatchSize; private final ExecutorService executor; private final ScheduledExecutorService schedulerExecutor; @@ -222,6 +223,7 @@ public SqlQueryScheduler( NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, + TaskExecutionStats taskExecutionStats, RemoteTaskFactory remoteTaskFactory, boolean summarizeTaskInfo, int splitBatchSize, @@ -245,6 +247,7 @@ public SqlQueryScheduler( this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); + this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.splitBatchSize = splitBatchSize; this.executor = requireNonNull(queryExecutor, "queryExecutor is null"); this.schedulerExecutor = requireNonNull(schedulerExecutor, "schedulerExecutor is null"); @@ -363,7 +366,8 @@ private synchronized Optional createDistributedStage schedulerExecutor, schedulerStats, nodeAllocatorService, - partitionMemoryEstimatorFactory); + partitionMemoryEstimatorFactory, + taskExecutionStats); break; case QUERY: case NONE: @@ -1766,7 +1770,8 @@ public static FaultTolerantDistributedStagesScheduler create( ScheduledExecutorService scheduledExecutorService, SplitSchedulerStats schedulerStats, NodeAllocatorService nodeAllocatorService, - PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory) + PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, + TaskExecutionStats taskExecutionStats) { taskDescriptorStorage.initialize(queryStateMachine.getQueryId()); queryStateMachine.addStateChangeListener(state -> { @@ -1831,6 +1836,7 @@ public static FaultTolerantDistributedStagesScheduler create( nodeAllocator, taskDescriptorStorage, partitionMemoryEstimatorFactory.createPartitionMemoryEstimator(), + taskExecutionStats, taskLifecycleListener, (future, delay) -> scheduledExecutorService.schedule(() -> future.set(null), delay.toMillis(), MILLISECONDS), systemTicker(), diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskExecutionStats.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskExecutionStats.java new file mode 100644 index 000000000000..d2f955108ad5 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskExecutionStats.java @@ -0,0 +1,209 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler; + +import io.airlift.log.Logger; +import io.airlift.stats.DistributionStat; +import io.airlift.stats.TimeStat; +import io.trino.execution.ExecutionFailureInfo; +import io.trino.execution.TaskInfo; +import io.trino.execution.TaskState; +import io.trino.operator.TaskStats; +import io.trino.spi.ErrorCode; +import io.trino.spi.ErrorType; +import io.trino.spi.TrinoException; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +import java.util.Optional; + +import static io.trino.spi.ErrorType.INTERNAL_ERROR; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.util.Failures.toFailure; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class TaskExecutionStats +{ + private static final Logger log = Logger.get(TaskExecutionStats.class); + + private final ExecutionStats finishedTasks = new ExecutionStats(); + private final ExecutionStats abortedTasks = new ExecutionStats(); + private final FailedTasksStats failedTasks = new FailedTasksStats(); + + public void update(TaskInfo info) + { + TaskState state = info.getTaskStatus().getState(); + switch (state) { + case FINISHED: + finishedTasks.update(info.getStats()); + break; + case FAILED: + failedTasks.update(info); + break; + case CANCELED: + case ABORTED: + abortedTasks.update(info.getStats()); + break; + case PLANNED: + case RUNNING: + case FLUSHING: + default: + log.error("Unexpected task state: %s", state); + } + } + + @Managed + @Nested + public ExecutionStats getFinishedTasks() + { + return finishedTasks; + } + + @Managed + @Nested + public ExecutionStats getAbortedTasks() + { + return abortedTasks; + } + + @Managed + @Nested + public FailedTasksStats getFailedTasks() + { + return failedTasks; + } + + public static class ExecutionStats + { + private final TimeStat elapsedTime = new TimeStat(MILLISECONDS); + private final TimeStat scheduledTime = new TimeStat(MILLISECONDS); + private final TimeStat cpuTime = new TimeStat(MILLISECONDS); + private final TimeStat inputBlockedTime = new TimeStat(MILLISECONDS); + private final TimeStat outputBlockedTime = new TimeStat(MILLISECONDS); + private final DistributionStat peakMemoryReservationInBytes = new DistributionStat(); + + public void update(TaskStats stats) + { + elapsedTime.add(stats.getElapsedTime()); + scheduledTime.add(stats.getTotalScheduledTime()); + cpuTime.add(stats.getTotalCpuTime()); + inputBlockedTime.add(stats.getInputBlockedTime()); + outputBlockedTime.add(stats.getOutputBlockedTime()); + peakMemoryReservationInBytes.add(stats.getPeakUserMemoryReservation().toBytes()); + } + + @Managed + @Nested + public TimeStat getElapsedTime() + { + return elapsedTime; + } + + @Managed + @Nested + public TimeStat getScheduledTime() + { + return scheduledTime; + } + + @Managed + @Nested + public TimeStat getCpuTime() + { + return cpuTime; + } + + @Managed + @Nested + public TimeStat getInputBlockedTime() + { + return inputBlockedTime; + } + + @Managed + @Nested + public TimeStat getOutputBlockedTime() + { + return outputBlockedTime; + } + + @Managed + @Nested + public DistributionStat getPeakMemoryReservationInBytes() + { + return peakMemoryReservationInBytes; + } + } + + public static class FailedTasksStats + { + private final ExecutionStats userError = new ExecutionStats(); + private final ExecutionStats internalError = new ExecutionStats(); + private final ExecutionStats externalError = new ExecutionStats(); + private final ExecutionStats insufficientResources = new ExecutionStats(); + + public void update(TaskInfo info) + { + ExecutionFailureInfo failureInfo = info.getTaskStatus().getFailures().stream() + .findFirst() + .orElse(toFailure(new TrinoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"))); + ErrorType errorType = Optional.ofNullable(failureInfo.getErrorCode()).map(ErrorCode::getType).orElse(INTERNAL_ERROR); + TaskStats stats = info.getStats(); + switch (errorType) { + case USER_ERROR: + userError.update(stats); + break; + case INTERNAL_ERROR: + internalError.update(stats); + break; + case EXTERNAL: + externalError.update(stats); + break; + case INSUFFICIENT_RESOURCES: + insufficientResources.update(stats); + break; + default: + log.error("Unexpected error type: %s", errorType); + } + } + + @Managed + @Nested + public ExecutionStats getUserError() + { + return userError; + } + + @Managed + @Nested + public ExecutionStats getInternalError() + { + return internalError; + } + + @Managed + @Nested + public ExecutionStats getExternalError() + { + return externalError; + } + + @Managed + @Nested + public ExecutionStats getInsufficientResources() + { + return insufficientResources; + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index dd537fc0ec3b..4e6c7422622e 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -71,6 +71,7 @@ import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.StageTaskSourceFactory; import io.trino.execution.scheduler.TaskDescriptorStorage; +import io.trino.execution.scheduler.TaskExecutionStats; import io.trino.execution.scheduler.TaskSourceFactory; import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy; import io.trino.execution.scheduler.policy.ExecutionPolicy; @@ -326,6 +327,9 @@ protected void setup(Binder binder) binder.bind(TaskDescriptorStorage.class).in(Scopes.SINGLETON); newExporter(binder).export(TaskDescriptorStorage.class).withGeneratedName(); + binder.bind(TaskExecutionStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(TaskExecutionStats.class).withGeneratedName(); + MapBinder executionPolicyBinder = newMapBinder(binder, String.class, ExecutionPolicy.class); executionPolicyBinder.addBinding("all-at-once").to(AllAtOnceExecutionPolicy.class); executionPolicyBinder.addBinding("legacy-phased").to(LegacyPhasedExecutionPolicy.class); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java index a54a9cc4f2d7..248451b8eaa0 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java @@ -927,6 +927,7 @@ private FaultTolerantStageScheduler createFaultTolerantTaskScheduler( nodeAllocator, taskDescriptorStorage, new ConstantPartitionMemoryEstimator(), + new TaskExecutionStats(), taskLifecycleListener, futureCompletor, ticker, From 220ee87a72da706e60fed52988d053e0242b2a38 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 25 Apr 2022 13:28:57 -0400 Subject: [PATCH 2/4] Expose filesystem exchange execution statistics via JMX --- plugin/trino-exchange/pom.xml | 15 ++ .../trino/plugin/exchange/ExecutionStats.java | 96 +++++++++++ .../plugin/exchange/FileSystemExchange.java | 17 +- .../exchange/FileSystemExchangeManager.java | 24 ++- .../FileSystemExchangeManagerFactory.java | 9 +- .../exchange/FileSystemExchangeModule.java | 7 + .../exchange/FileSystemExchangeSink.java | 22 ++- .../exchange/FileSystemExchangeSource.java | 23 ++- .../exchange/FileSystemExchangeStats.java | 94 +++++++++++ .../s3/S3FileSystemExchangeStorage.java | 96 +++++++---- .../s3/S3FileSystemExchangeStorageStats.java | 159 ++++++++++++++++++ 11 files changed, 513 insertions(+), 49 deletions(-) create mode 100644 plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java create mode 100644 plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java create mode 100644 plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java diff --git a/plugin/trino-exchange/pom.xml b/plugin/trino-exchange/pom.xml index 7327069e7a09..d078bcbd5f03 100644 --- a/plugin/trino-exchange/pom.xml +++ b/plugin/trino-exchange/pom.xml @@ -30,6 +30,11 @@ + + io.trino + trino-plugin-toolkit + + io.airlift bootstrap @@ -50,6 +55,11 @@ log + + io.airlift + stats + + io.airlift units @@ -91,6 +101,11 @@ 1.0.3 + + org.weakref + jmxutils + + software.amazon.awssdk auth diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java new file mode 100644 index 000000000000..4670ea6c8be6 --- /dev/null +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.exchange; + +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.stats.TimeStat; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class ExecutionStats +{ + private final TimeStat finished = new TimeStat(MILLISECONDS); + private final TimeStat failed = new TimeStat(MILLISECONDS); + + public T record(Supplier call) + { + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + T result = call.get(); + finished.add(stopwatch.elapsed(MILLISECONDS), MILLISECONDS); + return result; + } + catch (Throwable t) { + failed.add(stopwatch.elapsed(MILLISECONDS), MILLISECONDS); + throw t; + } + } + + public CompletableFuture record(CompletableFuture future) + { + Stopwatch stopwatch = Stopwatch.createStarted(); + future.whenComplete((value, failure) -> { + if (failure == null) { + finished.add(stopwatch.elapsed(MILLISECONDS), MILLISECONDS); + } + else { + failed.add(stopwatch.elapsed(MILLISECONDS), MILLISECONDS); + } + }); + return future; + } + + public ListenableFuture record(ListenableFuture future) + { + Stopwatch stopwatch = Stopwatch.createStarted(); + Futures.addCallback(future, new FutureCallback() + { + @Override + public void onSuccess(T result) + { + finished.add(stopwatch.elapsed(MILLISECONDS), MILLISECONDS); + } + + @Override + public void onFailure(Throwable t) + { + failed.add(stopwatch.elapsed(MILLISECONDS), MILLISECONDS); + } + }, directExecutor()); + return future; + } + + @Managed + @Nested + public TimeStat getFinished() + { + return finished; + } + + @Managed + @Nested + public TimeStat getFailed() + { + return failed; + } +} diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java index adf7d371fb43..9bdd22e40123 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java @@ -17,6 +17,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeContext; import io.trino.spi.exchange.ExchangeSinkHandle; @@ -66,6 +68,7 @@ public class FileSystemExchange private final List baseDirectories; private final FileSystemExchangeStorage exchangeStorage; + private final FileSystemExchangeStats stats; private final ExchangeContext exchangeContext; private final int outputPartitionCount; private final Optional secretKey; @@ -87,6 +90,7 @@ public class FileSystemExchange public FileSystemExchange( List baseDirectories, FileSystemExchangeStorage exchangeStorage, + FileSystemExchangeStats stats, ExchangeContext exchangeContext, int outputPartitionCount, Optional secretKey, @@ -97,6 +101,7 @@ public FileSystemExchange( this.baseDirectories = ImmutableList.copyOf(directories); this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); + this.stats = requireNonNull(stats, "stats is null"); this.exchangeContext = requireNonNull(exchangeContext, "exchangeContext is null"); this.outputPartitionCount = outputPartitionCount; this.secretKey = requireNonNull(secretKey, "secretKey is null"); @@ -157,7 +162,9 @@ private void checkInputReady() if (noMoreSinks && finishedSinks.containsAll(allSinks)) { // input is ready, create exchange source handles exchangeSourceHandlesCreationStarted = true; - exchangeSourceHandlesCreationFuture = supplyAsync(this::createExchangeSourceHandles, executor); + exchangeSourceHandlesCreationFuture = supplyAsync( + () -> stats.getCreateExchangeSourceHandles().record(this::createExchangeSourceHandles), + executor); } } if (exchangeSourceHandlesCreationFuture != null) { @@ -180,8 +187,8 @@ private List createExchangeSourceHandles() finishedTaskPartitions = ImmutableList.copyOf(finishedSinks); } for (Integer taskPartition : finishedTaskPartitions) { - URI committedAttemptPath = getCommittedAttemptPath(taskPartition); - Multimap partitions = getCommittedPartitions(committedAttemptPath); + URI committedAttemptPath = stats.getGetCommittedAttemptPath().record(() -> getCommittedAttemptPath(taskPartition)); + Multimap partitions = stats.getGetCommittedPartitions().record(() -> getCommittedPartitions(committedAttemptPath)); partitions.forEach(partitionFiles::put); } @@ -298,8 +305,10 @@ public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle @Override public void close() { + ImmutableList.Builder> futures = ImmutableList.builder(); for (Integer taskPartitionId : allSinks) { - exchangeStorage.deleteRecursively(getTaskOutputDirectory(taskPartitionId)); + futures.add(exchangeStorage.deleteRecursively(getTaskOutputDirectory(taskPartitionId))); } + stats.getCloseExchange().record(Futures.allAsList(futures.build())); } } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java index 28e603ac7f13..26022337ba91 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java @@ -50,6 +50,7 @@ public class FileSystemExchangeManager private static final int KEY_BITS = 256; private final FileSystemExchangeStorage exchangeStorage; + private final FileSystemExchangeStats stats; private final List baseDirectories; private final boolean exchangeEncryptionEnabled; private final int maxPageStorageSizeInBytes; @@ -60,11 +61,15 @@ public class FileSystemExchangeManager private final ExecutorService executor; @Inject - public FileSystemExchangeManager(FileSystemExchangeStorage exchangeStorage, FileSystemExchangeConfig fileSystemExchangeConfig) + public FileSystemExchangeManager( + FileSystemExchangeStorage exchangeStorage, + FileSystemExchangeStats stats, + FileSystemExchangeConfig fileSystemExchangeConfig) { requireNonNull(fileSystemExchangeConfig, "fileSystemExchangeConfig is null"); this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); + this.stats = requireNonNull(stats, "stats is null"); this.baseDirectories = ImmutableList.copyOf(requireNonNull(fileSystemExchangeConfig.getBaseDirectories(), "baseDirectories is null")); this.exchangeEncryptionEnabled = fileSystemExchangeConfig.isExchangeEncryptionEnabled(); this.maxPageStorageSizeInBytes = toIntExact(fileSystemExchangeConfig.getMaxPageStorageSize().toBytes()); @@ -89,7 +94,14 @@ public Exchange createExchange(ExchangeContext context, int outputPartitionCount throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generate new secret key: " + e.getMessage(), e); } } - return new FileSystemExchange(baseDirectories, exchangeStorage, context, outputPartitionCount, secretKey, executor); + return new FileSystemExchange( + baseDirectories, + exchangeStorage, + stats, + context, + outputPartitionCount, + secretKey, + executor); } @Override @@ -98,6 +110,7 @@ public ExchangeSink createSink(ExchangeSinkInstanceHandle handle, boolean preser FileSystemExchangeSinkInstanceHandle instanceHandle = (FileSystemExchangeSinkInstanceHandle) handle; return new FileSystemExchangeSink( exchangeStorage, + stats, instanceHandle.getOutputDirectory(), instanceHandle.getOutputPartitionCount(), instanceHandle.getSinkHandle().getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES")), @@ -123,6 +136,11 @@ public ExchangeSource createSource(List handles) entry.getValue(), fileStatus.getFileSize()))) .collect(toImmutableList()); - return new FileSystemExchangeSource(exchangeStorage, sourceFiles, maxPageStorageSizeInBytes, exchangeSourceConcurrentReaders); + return new FileSystemExchangeSource( + exchangeStorage, + stats, + sourceFiles, + maxPageStorageSizeInBytes, + exchangeSourceConcurrentReaders); } } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java index e68956f101f4..2a506cf043c5 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java @@ -15,11 +15,14 @@ import com.google.inject.Injector; import io.airlift.bootstrap.Bootstrap; +import io.trino.plugin.base.jmx.MBeanServerModule; +import io.trino.plugin.base.jmx.PrefixObjectNameGeneratorModule; import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeManagerFactory; import io.trino.spi.exchange.ExchangeManagerHandleResolver; import io.trino.spi.exchange.ExchangeSinkInstanceHandle; import io.trino.spi.exchange.ExchangeSourceHandle; +import org.weakref.jmx.guice.MBeanModule; import java.util.Map; @@ -39,7 +42,11 @@ public ExchangeManager create(Map config) { requireNonNull(config, "config is null"); - Bootstrap app = new Bootstrap(new FileSystemExchangeModule()); + Bootstrap app = new Bootstrap( + new MBeanModule(), + new MBeanServerModule(), + new PrefixObjectNameGeneratorModule("io.trino.plugin.exchange", "trino.plugin.exchange"), + new FileSystemExchangeModule()); Injector injector = app .doNotInitializeLogging() diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java index d2b411fb5d79..521b8753eaf1 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java @@ -20,6 +20,7 @@ import io.trino.plugin.exchange.local.LocalFileSystemExchangeStorage; import io.trino.plugin.exchange.s3.ExchangeS3Config; import io.trino.plugin.exchange.s3.S3FileSystemExchangeStorage; +import io.trino.plugin.exchange.s3.S3FileSystemExchangeStorageStats; import io.trino.spi.TrinoException; import java.net.URI; @@ -29,6 +30,7 @@ import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; +import static org.weakref.jmx.guice.ExportBinder.newExporter; public class FileSystemExchangeModule extends AbstractConfigurationAwareModule @@ -36,6 +38,9 @@ public class FileSystemExchangeModule @Override protected void setup(Binder binder) { + binder.bind(FileSystemExchangeStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(FileSystemExchangeStats.class).withGeneratedName(); + binder.bind(FileSystemExchangeManager.class).in(Scopes.SINGLETON); List baseDirectories = buildConfigObject(FileSystemExchangeConfig.class).getBaseDirectories(); @@ -47,6 +52,8 @@ protected void setup(Binder binder) binder.bind(FileSystemExchangeStorage.class).to(LocalFileSystemExchangeStorage.class).in(Scopes.SINGLETON); } else if (ImmutableSet.of("s3", "s3a", "s3n").contains(scheme)) { + binder.bind(S3FileSystemExchangeStorageStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(S3FileSystemExchangeStorageStats.class).withGeneratedName(); binder.bind(FileSystemExchangeStorage.class).to(S3FileSystemExchangeStorage.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(ExchangeS3Config.class); } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java index 9da6d3fe94cd..e301590ed5bc 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java @@ -69,6 +69,7 @@ public class FileSystemExchangeSink private static final int INSTANCE_SIZE = ClassLayout.parseClass(FileSystemExchangeSink.class).instanceSize(); private final FileSystemExchangeStorage exchangeStorage; + private final FileSystemExchangeStats stats; private final URI outputDirectory; private final int outputPartitionCount; private final Optional secretKey; @@ -83,6 +84,7 @@ public class FileSystemExchangeSink public FileSystemExchangeSink( FileSystemExchangeStorage exchangeStorage, + FileSystemExchangeStats stats, URI outputDirectory, int outputPartitionCount, Optional secretKey, @@ -96,6 +98,7 @@ public FileSystemExchangeSink( format("maxPageStorageSizeInBytes %s exceeded maxFileSizeInBytes %s", succinctBytes(maxPageStorageSizeInBytes), succinctBytes(maxFileSizeInBytes))); this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); + this.stats = requireNonNull(stats, "stats is null"); this.outputDirectory = requireNonNull(outputDirectory, "outputDirectory is null"); this.outputPartitionCount = outputPartitionCount; this.secretKey = requireNonNull(secretKey, "secretKey is null"); @@ -103,7 +106,7 @@ public FileSystemExchangeSink( this.maxPageStorageSizeInBytes = maxPageStorageSizeInBytes; this.maxFileSizeInBytes = maxFileSizeInBytes; // buffer pooling to overlap computation and I/O - this.bufferPool = new BufferPool(max(outputPartitionCount * exchangeSinkBuffersPerPartition, exchangeSinkBufferPoolMinSize), exchangeStorage.getWriteBufferSize()); + this.bufferPool = new BufferPool(stats, max(outputPartitionCount * exchangeSinkBuffersPerPartition, exchangeSinkBufferPoolMinSize), exchangeStorage.getWriteBufferSize()); } // The future returned by {@link #isBlocked()} should only be considered as a best-effort hint. @@ -135,6 +138,7 @@ private BufferedStorageWriter createWriter(int partitionId) { return new BufferedStorageWriter( exchangeStorage, + stats, outputDirectory, secretKey, preserveRecordsOrder, @@ -182,7 +186,7 @@ public void onFailure(Throwable ignored) } }, directExecutor()); - return toCompletableFuture(finishFuture); + return stats.getExchangeSinkFinish().record(toCompletableFuture(finishFuture)); } @Override @@ -197,10 +201,10 @@ public synchronized CompletableFuture abort() writersMap.values().stream().map(BufferedStorageWriter::abort).collect(toImmutableList()))); addSuccessCallback(abortFuture, this::destroy); - return toCompletableFuture(Futures.transformAsync( + return stats.getExchangeSinkAbort().record(toCompletableFuture(Futures.transformAsync( abortFuture, ignored -> exchangeStorage.deleteRecursively(outputDirectory), - directExecutor())); + directExecutor()))); } private void throwIfFailed() @@ -224,6 +228,7 @@ private static class BufferedStorageWriter private static final int INSTANCE_SIZE = ClassLayout.parseClass(BufferedStorageWriter.class).instanceSize(); private final FileSystemExchangeStorage exchangeStorage; + private final FileSystemExchangeStats stats; private final URI outputDirectory; private final Optional secretKey; private final boolean preserveRecordsOrder; @@ -246,6 +251,7 @@ private static class BufferedStorageWriter public BufferedStorageWriter( FileSystemExchangeStorage exchangeStorage, + FileSystemExchangeStats stats, URI outputDirectory, Optional secretKey, boolean preserveRecordsOrder, @@ -256,6 +262,7 @@ public BufferedStorageWriter( long maxFileSizeInBytes) { this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); + this.stats = requireNonNull(stats, "stats is null"); this.outputDirectory = requireNonNull(outputDirectory, "outputDirectory is null"); this.secretKey = requireNonNull(secretKey, "secretKey is null"); this.preserveRecordsOrder = preserveRecordsOrder; @@ -280,6 +287,7 @@ public synchronized void write(Slice data) } if (currentFileSize + requiredPageStorageSize > maxFileSizeInBytes && !preserveRecordsOrder) { + stats.getFileSizeInBytes().add(currentFileSize); flushIfNeeded(true); setupWriterForNextPart(); currentFileSize = 0; @@ -298,6 +306,7 @@ public synchronized ListenableFuture finish() return immediateFailedFuture(new IllegalStateException("BufferedStorageWriter has already closed")); } + stats.getFileSizeInBytes().add(currentFileSize); flushIfNeeded(true); if (writers.size() == 1) { return currentWriter.finish(); @@ -368,6 +377,7 @@ private static class BufferPool { private static final int INSTANCE_SIZE = ClassLayout.parseClass(BufferPool.class).instanceSize(); + private final FileSystemExchangeStats stats; private final int numBuffers; private final long bufferRetainedSize; @GuardedBy("this") @@ -377,8 +387,9 @@ private static class BufferPool @GuardedBy("this") private boolean closed; - public BufferPool(int numBuffers, int writeBufferSize) + public BufferPool(FileSystemExchangeStats stats, int numBuffers, int writeBufferSize) { + this.stats = requireNonNull(stats, "stats is null"); checkArgument(numBuffers >= 1, "numBuffers must be at least one"); this.numBuffers = numBuffers; @@ -394,6 +405,7 @@ public synchronized CompletableFuture isBlocked() if (freeBuffersQueue.isEmpty()) { if (blockedFuture.isDone()) { blockedFuture = new CompletableFuture<>(); + stats.getExchangeSinkBlocked().record(blockedFuture); } return blockedFuture; } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java index d1855b4d2d59..91de349cd2bc 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java @@ -36,16 +36,20 @@ public class FileSystemExchangeSource implements ExchangeSource { + private final FileSystemExchangeStats stats; private final List readers; + private volatile CompletableFuture blocked; private volatile boolean closed; public FileSystemExchangeSource( FileSystemExchangeStorage exchangeStorage, + FileSystemExchangeStats stats, List sourceFiles, int maxPageStorageSize, int exchangeSourceConcurrentReaders) { requireNonNull(exchangeStorage, "exchangeStorage is null"); + this.stats = requireNonNull(stats, "stats is null"); Queue sourceFileQueue = new ArrayBlockingQueue<>(sourceFiles.size()); sourceFileQueue.addAll(sourceFiles); @@ -61,16 +65,25 @@ public FileSystemExchangeSource( @Override public CompletableFuture isBlocked() { + CompletableFuture blocked = this.blocked; + if (blocked != null && !blocked.isDone()) { + return blocked; + } for (ExchangeStorageReader reader : readers) { if (reader.isBlocked().isDone()) { return NOT_BLOCKED; } } - return toCompletableFuture( - nonCancellationPropagating( - whenAnyComplete(readers.stream() - .map(ExchangeStorageReader::isBlocked) - .collect(toImmutableList())))); + synchronized (this) { + if (this.blocked == null || this.blocked.isDone()) { + this.blocked = stats.getExchangeSourceBlocked().record(toCompletableFuture( + nonCancellationPropagating( + whenAnyComplete(readers.stream() + .map(ExchangeStorageReader::isBlocked) + .collect(toImmutableList()))))); + } + return this.blocked; + } } @Override diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java new file mode 100644 index 000000000000..f3dd5a3ceb7f --- /dev/null +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.exchange; + +import io.airlift.stats.DistributionStat; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +public class FileSystemExchangeStats +{ + private final ExecutionStats createExchangeSourceHandles = new ExecutionStats(); + private final ExecutionStats getCommittedAttemptPath = new ExecutionStats(); + private final ExecutionStats getCommittedPartitions = new ExecutionStats(); + private final ExecutionStats closeExchange = new ExecutionStats(); + private final ExecutionStats exchangeSinkBlocked = new ExecutionStats(); + private final ExecutionStats exchangeSinkFinish = new ExecutionStats(); + private final ExecutionStats exchangeSinkAbort = new ExecutionStats(); + private final ExecutionStats exchangeSourceBlocked = new ExecutionStats(); + private final DistributionStat fileSizeInBytes = new DistributionStat(); + + @Managed + @Nested + public ExecutionStats getCreateExchangeSourceHandles() + { + return createExchangeSourceHandles; + } + + @Managed + @Nested + public ExecutionStats getGetCommittedAttemptPath() + { + return getCommittedAttemptPath; + } + + @Managed + @Nested + public ExecutionStats getGetCommittedPartitions() + { + return getCommittedPartitions; + } + + @Managed + @Nested + public ExecutionStats getCloseExchange() + { + return closeExchange; + } + + @Managed + @Nested + public ExecutionStats getExchangeSinkBlocked() + { + return exchangeSinkBlocked; + } + + @Managed + @Nested + public ExecutionStats getExchangeSinkFinish() + { + return exchangeSinkFinish; + } + + @Managed + @Nested + public ExecutionStats getExchangeSinkAbort() + { + return exchangeSinkAbort; + } + + @Managed + @Nested + public ExecutionStats getExchangeSourceBlocked() + { + return exchangeSourceBlocked; + } + + @Managed + @Nested + public DistributionStat getFileSizeInBytes() + { + return fileSizeInBytes; + } +} diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java index b869bb3dd356..db46a2f2171f 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java @@ -109,6 +109,7 @@ public class S3FileSystemExchangeStorage { private static final String DIRECTORY_SUFFIX = "_$folder$"; + private final S3FileSystemExchangeStorageStats stats; private final Optional region; private final Optional endpoint; private final int multiUploadPartSize; @@ -117,8 +118,9 @@ public class S3FileSystemExchangeStorage private final StorageClass storageClass; @Inject - public S3FileSystemExchangeStorage(ExchangeS3Config config) + public S3FileSystemExchangeStorage(S3FileSystemExchangeStorageStats stats, ExchangeS3Config config) { + this.stats = requireNonNull(stats, "stats is null"); requireNonNull(config, "config is null"); this.region = config.getS3Region(); this.endpoint = config.getS3Endpoint(); @@ -154,7 +156,7 @@ public void createDirectories(URI dir) @Override public ExchangeStorageReader createExchangeStorageReader(Queue sourceFiles, int maxPageStorageSize) { - return new S3ExchangeStorageReader(s3AsyncClient, sourceFiles, multiUploadPartSize, maxPageStorageSize); + return new S3ExchangeStorageReader(stats, s3AsyncClient, sourceFiles, multiUploadPartSize, maxPageStorageSize); } @Override @@ -163,7 +165,7 @@ public ExchangeStorageWriter createExchangeStorageWriter(URI file, Optional createEmptyFile(URI file) .key(keyFromUri(file)) .build(); - return transformFuture(toListenableFuture(s3AsyncClient.putObject(request, AsyncRequestBody.empty()))); + return stats.getCreateEmptyFile().record(transformFuture(toListenableFuture(s3AsyncClient.putObject(request, AsyncRequestBody.empty())))); } @Override @@ -191,14 +193,14 @@ public ListenableFuture deleteRecursively(URI uri) checkArgument(isDirectory(uri), "deleteRecursively called on file uri"); ImmutableList.Builder keys = ImmutableList.builder(); - return transformFuture(Futures.transformAsync( + return stats.getDeleteRecursively().record(transformFuture(Futures.transformAsync( toListenableFuture((listObjectsRecursively(uri).subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream().map(S3Object::key).forEach(keys::add)))), ignored -> { keys.add(keyFromUri(uri) + DIRECTORY_SUFFIX); return deleteObjects(getBucketName(uri), keys.build()); }, - directExecutor())); + directExecutor()))); } @Override @@ -207,18 +209,23 @@ public List listFiles(URI dir) { ImmutableList.Builder builder = ImmutableList.builder(); try { - for (S3Object object : listObjects(dir).contents()) { - builder.add(new FileStatus( - new URI(dir.getScheme(), dir.getHost(), PATH_SEPARATOR + object.key(), dir.getFragment()).toString(), - object.size())); - } + stats.getListFiles().record(() -> { + for (S3Object object : listObjects(dir).contents()) { + URI uri; + try { + uri = new URI(dir.getScheme(), dir.getHost(), PATH_SEPARATOR + object.key(), dir.getFragment()); + } + catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + builder.add(new FileStatus(uri.toString(), object.size())); + } + return null; + }); } catch (RuntimeException e) { throw new IOException(e); } - catch (URISyntaxException e) { - throw new IllegalArgumentException(e); - } return builder.build(); } @@ -228,16 +235,23 @@ public List listDirectories(URI dir) { ImmutableList.Builder builder = ImmutableList.builder(); try { - for (CommonPrefix prefix : listObjects(dir).commonPrefixes()) { - builder.add(new URI(dir.getScheme(), dir.getHost(), PATH_SEPARATOR + prefix.prefix(), dir.getFragment())); - } + stats.getListDirectories().record(() -> { + for (CommonPrefix prefix : listObjects(dir).commonPrefixes()) { + URI uri; + try { + uri = new URI(dir.getScheme(), dir.getHost(), PATH_SEPARATOR + prefix.prefix(), dir.getFragment()); + } + catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + builder.add(uri); + } + return null; + }); } catch (RuntimeException e) { throw new IOException(e); } - catch (URISyntaxException e) { - throw new IllegalArgumentException(e); - } return builder.build(); } @@ -267,7 +281,7 @@ private HeadObjectResponse headObject(URI uri, Optional secretKey) configureEncryption(secretKey, headObjectRequestBuilder); try { - return s3Client.headObject(headObjectRequestBuilder.build()); + return stats.getHeadObject().record(() -> s3Client.headObject(headObjectRequestBuilder.build())); } catch (RuntimeException e) { if (e instanceof NoSuchKeyException) { @@ -306,13 +320,14 @@ private ListObjectsV2Publisher listObjectsRecursively(URI dir) private ListenableFuture> deleteObjects(String bucketName, List keys) { List> subList = Lists.partition(keys, 1000); // deleteObjects has a limit of 1000 - return Futures.allAsList(subList.stream().map(list -> { + stats.getDeleteObjectsEntriesCount().add(keys.size()); + return stats.getDeleteObjects().record(Futures.allAsList(subList.stream().map(list -> { DeleteObjectsRequest request = DeleteObjectsRequest.builder() .bucket(bucketName) .delete(Delete.builder().objects(list.stream().map(key -> ObjectIdentifier.builder().key(key).build()).collect(toImmutableList())).build()) .build(); return toListenableFuture(s3AsyncClient.deleteObjects(request)); - }).collect(toImmutableList())); + }).collect(toImmutableList()))); } /** @@ -420,6 +435,7 @@ private static class S3ExchangeStorageReader { private static final int INSTANCE_SIZE = ClassLayout.parseClass(S3ExchangeStorageReader.class).instanceSize(); + private final S3FileSystemExchangeStorageStats stats; private final S3AsyncClient s3AsyncClient; private final Queue sourceFiles; private final int partSize; @@ -438,11 +454,13 @@ private static class S3ExchangeStorageReader private volatile ListenableFuture inProgressReadFuture = immediateVoidFuture(); public S3ExchangeStorageReader( + S3FileSystemExchangeStorageStats stats, S3AsyncClient s3AsyncClient, Queue sourceFiles, int partSize, int maxPageStorageSize) { + this.stats = requireNonNull(stats, "stats is null"); this.s3AsyncClient = requireNonNull(s3AsyncClient, "s3AsyncClient is null"); this.sourceFiles = requireNonNull(sourceFiles, "sourceFiles is null"); this.partSize = partSize; @@ -567,8 +585,11 @@ private void fillBuffer() .partNumber(partNumber); configureEncryption(secretKey, getObjectRequestBuilder); - getObjectFutures.add(toListenableFuture(s3AsyncClient.getObject(getObjectRequestBuilder.build(), - BufferWriteAsyncResponseTransformer.toBufferWrite(buffer, bufferFill)))); + ListenableFuture getObjectFuture = toListenableFuture(s3AsyncClient.getObject(getObjectRequestBuilder.build(), + BufferWriteAsyncResponseTransformer.toBufferWrite(buffer, bufferFill))); + stats.getGetObject().record(getObjectFuture); + stats.getGetObjectDataSizeInBytes().add(length); + getObjectFutures.add(getObjectFuture); bufferFill += length; fileOffset += length; } @@ -594,6 +615,7 @@ private static class S3ExchangeStorageWriter { private static final int INSTANCE_SIZE = ClassLayout.parseClass(S3ExchangeStorageWriter.class).instanceSize(); + private final S3FileSystemExchangeStorageStats stats; private final S3AsyncClient s3AsyncClient; private final String bucketName; private final String key; @@ -607,8 +629,16 @@ private static class S3ExchangeStorageWriter private final List> multiPartUploadFutures = new ArrayList<>(); private volatile boolean closed; - public S3ExchangeStorageWriter(S3AsyncClient s3AsyncClient, String bucketName, String key, int partSize, Optional secretKey, StorageClass storageClass) + public S3ExchangeStorageWriter( + S3FileSystemExchangeStorageStats stats, + S3AsyncClient s3AsyncClient, + String bucketName, + String key, + int partSize, + Optional secretKey, + StorageClass storageClass) { + this.stats = requireNonNull(stats, "stats is null"); this.s3AsyncClient = requireNonNull(s3AsyncClient, "s3AsyncClient is null"); this.bucketName = requireNonNull(bucketName, "bucketName is null"); this.key = requireNonNull(key, "key is null"); @@ -635,6 +665,8 @@ public ListenableFuture write(Slice slice) configureEncryption(secretKey, putObjectRequestBuilder); directUploadFuture = transformFuture(toListenableFuture(s3AsyncClient.putObject(putObjectRequestBuilder.build(), ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer())))); + stats.getPutObject().record(directUploadFuture); + stats.getPutObjectDataSizeInBytes().add(slice.length()); return directUploadFuture; } @@ -713,7 +745,7 @@ private ListenableFuture createMultipartUpload() .key(key) .storageClass(storageClass); configureEncryption(secretKey, createMultipartUploadRequestBuilder); - return toListenableFuture(s3AsyncClient.createMultipartUpload(createMultipartUploadRequestBuilder.build())); + return stats.getCreateMultipartUpload().record(toListenableFuture(s3AsyncClient.createMultipartUpload(createMultipartUploadRequestBuilder.build()))); } private ListenableFuture uploadPart(String uploadId, Slice slice, int partNumber) @@ -725,8 +757,9 @@ private ListenableFuture uploadPart(String uploadId, Slice slice, .partNumber(partNumber); configureEncryption(secretKey, uploadPartRequestBuilder); UploadPartRequest uploadPartRequest = uploadPartRequestBuilder.build(); - return Futures.transform(toListenableFuture(s3AsyncClient.uploadPart(uploadPartRequest, ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer()))), - uploadPartResponse -> CompletedPart.builder().eTag(uploadPartResponse.eTag()).partNumber(partNumber).build(), directExecutor()); + stats.getUploadPartDataSizeInBytes().add(slice.length()); + return stats.getUploadPart().record(Futures.transform(toListenableFuture(s3AsyncClient.uploadPart(uploadPartRequest, ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer()))), + uploadPartResponse -> CompletedPart.builder().eTag(uploadPartResponse.eTag()).partNumber(partNumber).build(), directExecutor())); } private ListenableFuture completeMultipartUpload(String uploadId, List completedParts) @@ -740,7 +773,8 @@ private ListenableFuture completeMultipartUploa .uploadId(uploadId) .multipartUpload(completedMultipartUpload) .build(); - return toListenableFuture(s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest)); + stats.getCompleteMultipartUploadPartsCount().add(completedParts.size()); + return stats.getCompleteMultipartUpload().record(toListenableFuture(s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest))); } private ListenableFuture abortMultipartUpload(String uploadId) @@ -750,7 +784,7 @@ private ListenableFuture abortMultipartUpload(Stri .key(key) .uploadId(uploadId) .build(); - return toListenableFuture(s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest)); + return stats.getAbortMultipartUpload().record(toListenableFuture(s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest))); } } } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java new file mode 100644 index 000000000000..a319e0df7ba1 --- /dev/null +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java @@ -0,0 +1,159 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.exchange.s3; + +import io.airlift.stats.DistributionStat; +import io.trino.plugin.exchange.ExecutionStats; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +public class S3FileSystemExchangeStorageStats +{ + private final ExecutionStats headObject = new ExecutionStats(); + private final ExecutionStats createEmptyFile = new ExecutionStats(); + private final ExecutionStats deleteRecursively = new ExecutionStats(); + private final ExecutionStats listFiles = new ExecutionStats(); + private final ExecutionStats listDirectories = new ExecutionStats(); + private final ExecutionStats deleteObjects = new ExecutionStats(); + private final DistributionStat deleteObjectsEntriesCount = new DistributionStat(); + private final ExecutionStats getObject = new ExecutionStats(); + private final DistributionStat getObjectDataSizeInBytes = new DistributionStat(); + private final ExecutionStats putObject = new ExecutionStats(); + private final DistributionStat putObjectDataSizeInBytes = new DistributionStat(); + private final ExecutionStats createMultipartUpload = new ExecutionStats(); + private final ExecutionStats uploadPart = new ExecutionStats(); + private final DistributionStat uploadPartDataSizeInBytes = new DistributionStat(); + private final ExecutionStats completeMultipartUpload = new ExecutionStats(); + private final DistributionStat completeMultipartUploadPartsCount = new DistributionStat(); + private final ExecutionStats abortMultipartUpload = new ExecutionStats(); + + @Managed + @Nested + public ExecutionStats getHeadObject() + { + return headObject; + } + + @Managed + @Nested + public ExecutionStats getCreateEmptyFile() + { + return createEmptyFile; + } + + @Managed + @Nested + public ExecutionStats getDeleteRecursively() + { + return deleteRecursively; + } + + @Managed + @Nested + public ExecutionStats getListFiles() + { + return listFiles; + } + + @Managed + @Nested + public ExecutionStats getListDirectories() + { + return listDirectories; + } + + @Managed + @Nested + public ExecutionStats getDeleteObjects() + { + return deleteObjects; + } + + @Managed + @Nested + public DistributionStat getDeleteObjectsEntriesCount() + { + return deleteObjectsEntriesCount; + } + + @Managed + @Nested + public ExecutionStats getGetObject() + { + return getObject; + } + + @Managed + @Nested + public DistributionStat getGetObjectDataSizeInBytes() + { + return getObjectDataSizeInBytes; + } + + @Managed + @Nested + public ExecutionStats getPutObject() + { + return putObject; + } + + @Managed + @Nested + public DistributionStat getPutObjectDataSizeInBytes() + { + return putObjectDataSizeInBytes; + } + + @Managed + @Nested + public ExecutionStats getCreateMultipartUpload() + { + return createMultipartUpload; + } + + @Managed + @Nested + public ExecutionStats getUploadPart() + { + return uploadPart; + } + + @Managed + @Nested + public DistributionStat getUploadPartDataSizeInBytes() + { + return uploadPartDataSizeInBytes; + } + + @Managed + @Nested + public ExecutionStats getCompleteMultipartUpload() + { + return completeMultipartUpload; + } + + @Managed + @Nested + public DistributionStat getCompleteMultipartUploadPartsCount() + { + return completeMultipartUploadPartsCount; + } + + @Managed + @Nested + public ExecutionStats getAbortMultipartUpload() + { + return abortMultipartUpload; + } +} From 6cee5b9969a1fdb1040eb94ad5372d6704cd92f8 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 25 Apr 2022 13:37:19 -0400 Subject: [PATCH 3/4] Change package name for filesystem exchange module Move from `io.trino.plugin.exchange` to `io.trino.plugin.exchange.filesystem` --- .../TestDeduplicatingDirectExchangeBuffer.java | 2 +- .../TestDeltaTaskFailureRecoveryTest.java | 6 +++--- .../{ => filesystem}/ExchangeSourceFile.java | 2 +- .../{ => filesystem}/ExchangeStorageReader.java | 2 +- .../{ => filesystem}/ExchangeStorageWriter.java | 2 +- .../{ => filesystem}/ExecutionStats.java | 2 +- .../exchange/{ => filesystem}/FileStatus.java | 2 +- .../{ => filesystem}/FileSystemExchange.java | 8 ++++---- .../FileSystemExchangeConfig.java | 4 ++-- .../FileSystemExchangeManager.java | 2 +- .../FileSystemExchangeManagerFactory.java | 4 ++-- .../FileSystemExchangeModule.java | 10 +++++----- .../FileSystemExchangePlugin.java | 2 +- .../{ => filesystem}/FileSystemExchangeSink.java | 2 +- .../FileSystemExchangeSinkHandle.java | 2 +- .../FileSystemExchangeSinkInstanceHandle.java | 2 +- .../FileSystemExchangeSource.java | 2 +- .../FileSystemExchangeSourceHandle.java | 2 +- .../FileSystemExchangeStats.java | 2 +- .../FileSystemExchangeStorage.java | 2 +- .../local/LocalFileSystemExchangeStorage.java | 12 ++++++------ .../s3/BufferWriteAsyncResponseTransformer.java | 2 +- .../s3/ByteBufferAsyncRequestBody.java | 2 +- .../{ => filesystem}/s3/ExchangeS3Config.java | 2 +- .../s3/S3FileSystemExchangeStorage.java | 16 ++++++++-------- .../s3/S3FileSystemExchangeStorageStats.java | 4 ++-- .../{ => filesystem}/s3/S3RequestUtil.java | 2 +- .../AbstractTestExchangeManager.java | 2 +- .../TestFileSystemExchangeConfig.java | 2 +- .../containers/MinioStorage.java | 2 +- .../TestLocalFileSystemExchangeManager.java | 6 +++--- .../s3/TestExchangeS3Config.java | 2 +- .../s3/TestS3FileSystemExchangeManager.java | 10 +++++----- .../trino/plugin/hive/BaseHiveConnectorTest.java | 2 +- ...stHiveFaultTolerantExecutionAggregations.java | 6 +++--- ...tHiveFaultTolerantExecutionConnectorTest.java | 4 ++-- ...estHiveFaultTolerantExecutionJoinQueries.java | 6 +++--- ...HiveFaultTolerantExecutionOrderByQueries.java | 6 +++--- ...tHiveFaultTolerantExecutionWindowQueries.java | 6 +++--- .../hive/TestHiveQueryFailureRecoveryTest.java | 6 +++--- .../hive/TestHiveTaskFailureRecoveryTest.java | 6 +++--- .../TestIcebergQueryFailureRecoveryTest.java | 6 +++--- .../TestIcebergTaskFailureRecoveryTest.java | 6 +++--- ...istributedFaultTolerantEngineOnlyQueries.java | 2 +- 44 files changed, 91 insertions(+), 91 deletions(-) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/ExchangeSourceFile.java (96%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/ExchangeStorageReader.java (95%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/ExchangeStorageWriter.java (95%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/ExecutionStats.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileStatus.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchange.java (97%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeConfig.java (97%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeManager.java (99%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeManagerFactory.java (95%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeModule.java (88%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangePlugin.java (95%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeSink.java (99%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeSinkHandle.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeSinkInstanceHandle.java (97%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeSource.java (99%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeSourceHandle.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeStats.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/FileSystemExchangeStorage.java (97%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/local/LocalFileSystemExchangeStorage.java (96%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/s3/BufferWriteAsyncResponseTransformer.java (99%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/s3/ByteBufferAsyncRequestBody.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/s3/ExchangeS3Config.java (99%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/s3/S3FileSystemExchangeStorage.java (98%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/s3/S3FileSystemExchangeStorageStats.java (97%) rename plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/{ => filesystem}/s3/S3RequestUtil.java (98%) rename plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/{ => filesystem}/AbstractTestExchangeManager.java (99%) rename plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/{ => filesystem}/TestFileSystemExchangeConfig.java (98%) rename plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/{ => filesystem}/containers/MinioStorage.java (98%) rename plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/{ => filesystem}/local/TestLocalFileSystemExchangeManager.java (87%) rename plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/{ => filesystem}/s3/TestExchangeS3Config.java (98%) rename plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/{ => filesystem}/s3/TestS3FileSystemExchangeManager.java (78%) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java index 1d5e3c218a33..e51b6cd46e7d 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java @@ -26,7 +26,7 @@ import io.trino.execution.StageId; import io.trino.execution.TaskId; import io.trino.metadata.ExchangeHandleResolver; -import io.trino.plugin.exchange.FileSystemExchangeManagerFactory; +import io.trino.plugin.exchange.filesystem.FileSystemExchangeManagerFactory; import io.trino.spi.QueryId; import io.trino.spi.TrinoException; import org.testng.annotations.AfterClass; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java index eecda77f78fc..d282e55131ff 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java @@ -16,8 +16,8 @@ import com.google.common.collect.ImmutableMap; import io.trino.operator.RetryPolicy; import io.trino.plugin.deltalake.util.DockerizedMinioDataLake; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; @@ -27,7 +27,7 @@ import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeSourceFile.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java similarity index 96% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeSourceFile.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java index 96a08b0bad1f..c8f049d94060 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeSourceFile.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import javax.annotation.concurrent.Immutable; import javax.crypto.SecretKey; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeStorageReader.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java similarity index 95% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeStorageReader.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java index 2805d09ffe7e..4d2a777870ea 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeStorageReader.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeStorageWriter.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java similarity index 95% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeStorageWriter.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java index c45c26b9dabc..54273c38c728 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeStorageWriter.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java index 4670ea6c8be6..4d8d695bdef1 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExecutionStats.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.FutureCallback; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileStatus.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileStatus.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java index 64dfc77636bf..d9fb97289875 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileStatus.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java similarity index 97% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index 9bdd22e40123..cf0d77e12807 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; @@ -53,9 +53,9 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.exchange.FileSystemExchangeManager.PATH_SEPARATOR; -import static io.trino.plugin.exchange.FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME; -import static io.trino.plugin.exchange.FileSystemExchangeSink.DATA_FILE_SUFFIX; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.DATA_FILE_SUFFIX; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java similarity index 97% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java index aaabba1f8263..a8fab4a5cb12 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; @@ -28,7 +28,7 @@ import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; -import static io.trino.plugin.exchange.FileSystemExchangeManager.PATH_SEPARATOR; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR; public class FileSystemExchangeConfig { diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java similarity index 99% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java index 26022337ba91..bb0f8430f462 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableList; import io.trino.spi.TrinoException; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java similarity index 95% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java index 2a506cf043c5..2eaa0204a8b3 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManagerFactory.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.inject.Injector; import io.airlift.bootstrap.Bootstrap; @@ -45,7 +45,7 @@ public ExchangeManager create(Map config) Bootstrap app = new Bootstrap( new MBeanModule(), new MBeanServerModule(), - new PrefixObjectNameGeneratorModule("io.trino.plugin.exchange", "trino.plugin.exchange"), + new PrefixObjectNameGeneratorModule("io.trino.plugin.exchange.filesystem", "trino.plugin.exchange.filesystem"), new FileSystemExchangeModule()); Injector injector = app diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java similarity index 88% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java index 521b8753eaf1..c4c0bcbc8939 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java @@ -11,16 +11,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableSet; import com.google.inject.Binder; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; -import io.trino.plugin.exchange.local.LocalFileSystemExchangeStorage; -import io.trino.plugin.exchange.s3.ExchangeS3Config; -import io.trino.plugin.exchange.s3.S3FileSystemExchangeStorage; -import io.trino.plugin.exchange.s3.S3FileSystemExchangeStorageStats; +import io.trino.plugin.exchange.filesystem.local.LocalFileSystemExchangeStorage; +import io.trino.plugin.exchange.filesystem.s3.ExchangeS3Config; +import io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage; +import io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorageStats; import io.trino.spi.TrinoException; import java.net.URI; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangePlugin.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java similarity index 95% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangePlugin.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java index 04e5b08dc19c..01663afe5c54 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangePlugin.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableList; import io.trino.spi.Plugin; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java similarity index 99% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java index e301590ed5bc..100bc756752d 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSinkHandle.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSinkHandle.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java index 3704227f3dfe..0a3588ae6362 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSinkHandle.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSinkInstanceHandle.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java similarity index 97% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSinkInstanceHandle.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java index 0335c5716cb7..ade2d4009008 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSinkInstanceHandle.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java similarity index 99% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java index 91de349cd2bc..dd32ea4f7220 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSourceHandle.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSourceHandle.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java index 0ae44e542f4b..cd45fda00343 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSourceHandle.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java index f3dd5a3ceb7f..c805288b2a85 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStats.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import io.airlift.stats.DistributionStat; import org.weakref.jmx.Managed; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStorage.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java similarity index 97% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStorage.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java index 61487b63fef6..a974d0bf398d 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStorage.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.util.concurrent.ListenableFuture; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/local/LocalFileSystemExchangeStorage.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java similarity index 96% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/local/LocalFileSystemExchangeStorage.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java index 25d671cb1efe..7b2953efe49a 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/local/LocalFileSystemExchangeStorage.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.local; +package io.trino.plugin.exchange.filesystem.local; import com.google.common.collect.ImmutableList; import com.google.common.io.MoreFiles; @@ -19,11 +19,11 @@ import io.airlift.slice.InputStreamSliceInput; import io.airlift.slice.Slice; import io.airlift.units.DataSize; -import io.trino.plugin.exchange.ExchangeSourceFile; -import io.trino.plugin.exchange.ExchangeStorageReader; -import io.trino.plugin.exchange.ExchangeStorageWriter; -import io.trino.plugin.exchange.FileStatus; -import io.trino.plugin.exchange.FileSystemExchangeStorage; +import io.trino.plugin.exchange.filesystem.ExchangeSourceFile; +import io.trino.plugin.exchange.filesystem.ExchangeStorageReader; +import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter; +import io.trino.plugin.exchange.filesystem.FileStatus; +import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage; import io.trino.spi.TrinoException; import org.openjdk.jol.info.ClassLayout; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/BufferWriteAsyncResponseTransformer.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java similarity index 99% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/BufferWriteAsyncResponseTransformer.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java index 4c27872511ba..46bf8e5517e3 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/BufferWriteAsyncResponseTransformer.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ByteBufferAsyncRequestBody.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ByteBufferAsyncRequestBody.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java index 5d492e031590..cf141169323c 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ByteBufferAsyncRequestBody.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import io.airlift.log.Logger; import org.reactivestreams.Subscriber; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java similarity index 99% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java index 085655eb767a..ba108f8757d7 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java index db46a2f2171f..5da9fd099871 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -23,11 +23,11 @@ import io.airlift.slice.SliceInput; import io.airlift.slice.Slices; import io.airlift.units.Duration; -import io.trino.plugin.exchange.ExchangeSourceFile; -import io.trino.plugin.exchange.ExchangeStorageReader; -import io.trino.plugin.exchange.ExchangeStorageWriter; -import io.trino.plugin.exchange.FileStatus; -import io.trino.plugin.exchange.FileSystemExchangeStorage; +import io.trino.plugin.exchange.filesystem.ExchangeSourceFile; +import io.trino.plugin.exchange.filesystem.ExchangeStorageReader; +import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter; +import io.trino.plugin.exchange.filesystem.FileStatus; +import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage; import org.openjdk.jol.info.ClassLayout; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -95,8 +95,8 @@ import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; -import static io.trino.plugin.exchange.FileSystemExchangeManager.PATH_SEPARATOR; -import static io.trino.plugin.exchange.s3.S3RequestUtil.configureEncryption; +import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR; +import static io.trino.plugin.exchange.filesystem.s3.S3RequestUtil.configureEncryption; import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java similarity index 97% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java index a319e0df7ba1..30bc52f48059 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorageStats.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java @@ -11,10 +11,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import io.airlift.stats.DistributionStat; -import io.trino.plugin.exchange.ExecutionStats; +import io.trino.plugin.exchange.filesystem.ExecutionStats; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3RequestUtil.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java similarity index 98% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3RequestUtil.java rename to plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java index 7ab840a9952e..52aa7b875f57 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3RequestUtil.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/AbstractTestExchangeManager.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java similarity index 99% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/AbstractTestExchangeManager.java rename to plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java index b5a3d8bf1d7b..3610364e3652 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/AbstractTestExchangeManager.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java similarity index 98% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java rename to plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java index 72a68bb01ae8..375b84d6bbe8 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange; +package io.trino.plugin.exchange.filesystem; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java similarity index 98% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java rename to plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java index ea1135ae797e..942a38a05da1 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.containers; +package io.trino.plugin.exchange.filesystem.containers; import com.google.common.collect.ImmutableMap; import io.trino.testing.containers.Minio; diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java similarity index 87% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java rename to plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java index eca4a8ed47b0..afebeafb86f3 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java @@ -11,11 +11,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.local; +package io.trino.plugin.exchange.filesystem.local; import com.google.common.collect.ImmutableMap; -import io.trino.plugin.exchange.AbstractTestExchangeManager; -import io.trino.plugin.exchange.FileSystemExchangeManagerFactory; +import io.trino.plugin.exchange.filesystem.AbstractTestExchangeManager; +import io.trino.plugin.exchange.filesystem.FileSystemExchangeManagerFactory; import io.trino.spi.exchange.ExchangeManager; public class TestLocalFileSystemExchangeManager diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java similarity index 98% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java rename to plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java index 7a21c26c0522..227175350c47 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestS3FileSystemExchangeManager.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java similarity index 78% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestS3FileSystemExchangeManager.java rename to plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java index 824f42f78d90..7b4b5b8c8520 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestS3FileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java @@ -11,15 +11,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.exchange.s3; +package io.trino.plugin.exchange.filesystem.s3; -import io.trino.plugin.exchange.AbstractTestExchangeManager; -import io.trino.plugin.exchange.FileSystemExchangeManagerFactory; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.AbstractTestExchangeManager; +import io.trino.plugin.exchange.filesystem.FileSystemExchangeManagerFactory; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.spi.exchange.ExchangeManager; import org.testng.annotations.AfterClass; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static java.util.UUID.randomUUID; public class TestS3FileSystemExchangeManager diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 326f3e8f59d6..a9cbcb5072ac 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -31,7 +31,7 @@ import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.TableHandle; import io.trino.metadata.TableMetadata; -import io.trino.plugin.exchange.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionAggregations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionAggregations.java index b6898bd16853..54531a82f6a2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionAggregations.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionAggregations.java @@ -13,15 +13,15 @@ */ package io.trino.plugin.hive; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.AbstractTestFaultTolerantExecutionAggregations; import io.trino.testing.QueryRunner; import org.testng.annotations.AfterClass; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static io.trino.tpch.TpchTable.getTables; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionConnectorTest.java index cae950c79741..94505bb5f3fc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionConnectorTest.java @@ -13,11 +13,11 @@ */ package io.trino.plugin.hive; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.QueryRunner; import org.testng.annotations.AfterClass; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.FaultTolerantExecutionConnectorTestHelper.getExtraProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionJoinQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionJoinQueries.java index 0cab98859436..3928bc1be890 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionJoinQueries.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionJoinQueries.java @@ -13,15 +13,15 @@ */ package io.trino.plugin.hive; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.AbstractTestFaultTolerantExecutionJoinQueries; import io.trino.testing.QueryRunner; import org.testng.annotations.AfterClass; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static io.trino.tpch.TpchTable.getTables; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionOrderByQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionOrderByQueries.java index 4aef1282b0e0..723b3a56f9b0 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionOrderByQueries.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionOrderByQueries.java @@ -13,15 +13,15 @@ */ package io.trino.plugin.hive; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.AbstractTestFaultTolerantExecutionOrderByQueries; import io.trino.testing.QueryRunner; import org.testng.annotations.AfterClass; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static io.trino.tpch.TpchTable.getTables; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionWindowQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionWindowQueries.java index f3b873463096..652c91fc2143 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionWindowQueries.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFaultTolerantExecutionWindowQueries.java @@ -13,15 +13,15 @@ */ package io.trino.plugin.hive; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.AbstractTestFaultTolerantExecutionWindowQueries; import io.trino.testing.QueryRunner; import org.testng.annotations.AfterClass; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static io.trino.tpch.TpchTable.getTables; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveQueryFailureRecoveryTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveQueryFailureRecoveryTest.java index db280b91e55a..07205c4adb27 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveQueryFailureRecoveryTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveQueryFailureRecoveryTest.java @@ -15,8 +15,8 @@ import com.google.common.collect.ImmutableMap; import io.trino.operator.RetryPolicy; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.plugin.hive.s3.S3HiveQueryRunner; @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; public class TestHiveQueryFailureRecoveryTest diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveTaskFailureRecoveryTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveTaskFailureRecoveryTest.java index 97005d6aacbf..b38db7986558 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveTaskFailureRecoveryTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveTaskFailureRecoveryTest.java @@ -15,8 +15,8 @@ import com.google.common.collect.ImmutableMap; import io.trino.operator.RetryPolicy; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.plugin.hive.s3.S3HiveQueryRunner; @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergQueryFailureRecoveryTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergQueryFailureRecoveryTest.java index 34f3bd7da089..7d744698bf03 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergQueryFailureRecoveryTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergQueryFailureRecoveryTest.java @@ -14,8 +14,8 @@ package io.trino.plugin.iceberg; import io.trino.operator.RetryPolicy; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; import org.testng.annotations.AfterClass; @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; public class TestIcebergQueryFailureRecoveryTest diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTaskFailureRecoveryTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTaskFailureRecoveryTest.java index e6bc5fd53f16..eec62f86508b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTaskFailureRecoveryTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTaskFailureRecoveryTest.java @@ -14,8 +14,8 @@ package io.trino.plugin.iceberg; import io.trino.operator.RetryPolicy; -import io.trino.plugin.exchange.FileSystemExchangePlugin; -import io.trino.plugin.exchange.containers.MinioStorage; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; import org.testng.annotations.AfterClass; @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; -import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties; +import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java index c4dc11cbe45a..b6b3035c06a0 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java @@ -16,7 +16,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.connector.MockConnectorFactory; import io.trino.connector.MockConnectorPlugin; -import io.trino.plugin.exchange.FileSystemExchangePlugin; +import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; import io.trino.plugin.memory.MemoryQueryRunner; import io.trino.testing.AbstractDistributedEngineOnlyQueries; import io.trino.testing.DistributedQueryRunner; From 6f8d936f4200e17721b4f6a1ce8bcaefed583c54 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 25 Apr 2022 13:46:11 -0400 Subject: [PATCH 4/4] Rename filesystem exchange module From trino-exchange to trino-exchange-filesystem --- core/trino-main/pom.xml | 2 +- core/trino-server/src/main/provisio/trino.xml | 4 ++-- plugin/trino-delta-lake/pom.xml | 8 ++++---- .../{trino-exchange => trino-exchange-filesystem}/pom.xml | 2 +- .../plugin/exchange/filesystem/ExchangeSourceFile.java | 0 .../plugin/exchange/filesystem/ExchangeStorageReader.java | 0 .../plugin/exchange/filesystem/ExchangeStorageWriter.java | 0 .../trino/plugin/exchange/filesystem/ExecutionStats.java | 0 .../io/trino/plugin/exchange/filesystem/FileStatus.java | 0 .../plugin/exchange/filesystem/FileSystemExchange.java | 0 .../exchange/filesystem/FileSystemExchangeConfig.java | 0 .../exchange/filesystem/FileSystemExchangeManager.java | 0 .../filesystem/FileSystemExchangeManagerFactory.java | 0 .../exchange/filesystem/FileSystemExchangeModule.java | 0 .../exchange/filesystem/FileSystemExchangePlugin.java | 0 .../exchange/filesystem/FileSystemExchangeSink.java | 0 .../exchange/filesystem/FileSystemExchangeSinkHandle.java | 0 .../filesystem/FileSystemExchangeSinkInstanceHandle.java | 0 .../exchange/filesystem/FileSystemExchangeSource.java | 0 .../filesystem/FileSystemExchangeSourceHandle.java | 0 .../exchange/filesystem/FileSystemExchangeStats.java | 0 .../exchange/filesystem/FileSystemExchangeStorage.java | 0 .../filesystem/local/LocalFileSystemExchangeStorage.java | 0 .../s3/BufferWriteAsyncResponseTransformer.java | 0 .../filesystem/s3/ByteBufferAsyncRequestBody.java | 0 .../plugin/exchange/filesystem/s3/ExchangeS3Config.java | 0 .../filesystem/s3/S3FileSystemExchangeStorage.java | 0 .../filesystem/s3/S3FileSystemExchangeStorageStats.java | 0 .../plugin/exchange/filesystem/s3/S3RequestUtil.java | 0 .../exchange/filesystem/AbstractTestExchangeManager.java | 0 .../exchange/filesystem/TestFileSystemExchangeConfig.java | 0 .../exchange/filesystem/containers/MinioStorage.java | 0 .../local/TestLocalFileSystemExchangeManager.java | 0 .../exchange/filesystem/s3/TestExchangeS3Config.java | 0 .../filesystem/s3/TestS3FileSystemExchangeManager.java | 0 plugin/trino-hive/pom.xml | 4 ++-- plugin/trino-iceberg/pom.xml | 4 ++-- pom.xml | 6 +++--- testing/trino-server-dev/etc/config.properties | 2 +- testing/trino-tests/pom.xml | 2 +- 40 files changed, 17 insertions(+), 17 deletions(-) rename plugin/{trino-exchange => trino-exchange-filesystem}/pom.xml (99%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java (100%) rename plugin/{trino-exchange => trino-exchange-filesystem}/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java (100%) diff --git a/core/trino-main/pom.xml b/core/trino-main/pom.xml index c08b56ad67a6..4fdaabe06b65 100644 --- a/core/trino-main/pom.xml +++ b/core/trino-main/pom.xml @@ -378,7 +378,7 @@ io.trino - trino-exchange + trino-exchange-filesystem test diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index 5860fec127da..f7f8c165bc02 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -283,8 +283,8 @@ - - + + diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 39e77f4e13d6..369148dd8f92 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -199,13 +199,13 @@ io.trino - trino-exchange + trino-exchange-filesystem test io.trino - trino-exchange + trino-exchange-filesystem test-jar test @@ -309,12 +309,12 @@ test - + io.netty netty-transport-classes-epoll - + io.netty netty-transport-native-epoll diff --git a/plugin/trino-exchange/pom.xml b/plugin/trino-exchange-filesystem/pom.xml similarity index 99% rename from plugin/trino-exchange/pom.xml rename to plugin/trino-exchange-filesystem/pom.xml index d078bcbd5f03..397dc126bce3 100644 --- a/plugin/trino-exchange/pom.xml +++ b/plugin/trino-exchange-filesystem/pom.xml @@ -8,7 +8,7 @@ ../../pom.xml - trino-exchange + trino-exchange-filesystem Trino - Exchange trino-plugin diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageReader.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeStorageWriter.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExecutionStats.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileStatus.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeConfig.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangePlugin.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkHandle.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSinkInstanceHandle.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStats.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeStorage.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/BufferWriteAsyncResponseTransformer.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ExchangeS3Config.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorageStats.java diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java similarity index 100% rename from plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java rename to plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3RequestUtil.java diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java similarity index 100% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java rename to plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.java diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java similarity index 100% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java rename to plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/TestFileSystemExchangeConfig.java diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java similarity index 100% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java rename to plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java similarity index 100% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java rename to plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/local/TestLocalFileSystemExchangeManager.java diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java similarity index 100% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java rename to plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestExchangeS3Config.java diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java similarity index 100% rename from plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java rename to plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/s3/TestS3FileSystemExchangeManager.java diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index ab5e3f3899b9..199d846aac55 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -321,13 +321,13 @@ io.trino - trino-exchange + trino-exchange-filesystem test io.trino - trino-exchange + trino-exchange-filesystem test-jar test diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 2423bea5b762..dfacd8e97407 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -275,13 +275,13 @@ io.trino - trino-exchange + trino-exchange-filesystem test io.trino - trino-exchange + trino-exchange-filesystem test-jar test diff --git a/pom.xml b/pom.xml index 6f67c0b10a79..9a255e8b6cd0 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ plugin/trino-druid plugin/trino-elasticsearch plugin/trino-example-http - plugin/trino-exchange + plugin/trino-exchange-filesystem plugin/trino-geospatial plugin/trino-google-sheets plugin/trino-hive @@ -265,13 +265,13 @@ io.trino - trino-exchange + trino-exchange-filesystem ${project.version} io.trino - trino-exchange + trino-exchange-filesystem test-jar ${project.version} diff --git a/testing/trino-server-dev/etc/config.properties b/testing/trino-server-dev/etc/config.properties index dbd46a7374b1..3be40b7fab8d 100644 --- a/testing/trino-server-dev/etc/config.properties +++ b/testing/trino-server-dev/etc/config.properties @@ -53,6 +53,6 @@ plugin.bundles=\ ../../plugin/trino-druid/pom.xml, \ ../../plugin/trino-geospatial/pom.xml, \ ../../plugin/trino-http-event-listener/pom.xml, \ - ../../plugin/trino-exchange/pom.xml + ../../plugin/trino-exchange-filesystem/pom.xml node-scheduler.include-coordinator=true diff --git a/testing/trino-tests/pom.xml b/testing/trino-tests/pom.xml index 4a903cb0cfc3..85bc4c4c551f 100644 --- a/testing/trino-tests/pom.xml +++ b/testing/trino-tests/pom.xml @@ -141,7 +141,7 @@ io.trino - trino-exchange + trino-exchange-filesystem test