diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 2f56a09680cc..c08a13a91391 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -53,6 +53,7 @@ import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; import static java.lang.Math.min; import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; public final class SystemSessionProperties implements SystemSessionPropertiesProvider @@ -207,6 +208,8 @@ public final class SystemSessionProperties public static final String USE_COST_BASED_PARTITIONING = "use_cost_based_partitioning"; public static final String FORCE_SPILLING_JOIN = "force_spilling_join"; public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size"; + public static final String IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD = "idle_writer_min_data_size_threshold"; + public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration"; private final List> sessionProperties; @@ -1058,6 +1061,14 @@ public SystemSessionProperties( integerProperty(PAGE_PARTITIONING_BUFFER_POOL_SIZE, "Maximum number of free buffers in the per task partitioned page buffer pool. Setting this to zero effectively disables the pool", taskManagerConfig.getPagePartitioningBufferPoolSize(), + true), + dataSizeProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD, + "Minimum amount of data written by a writer operator on average before it tries to close the idle writers", + DataSize.of(256, MEGABYTE), + true), + durationProperty(CLOSE_IDLE_WRITERS_TRIGGER_DURATION, + "The duration after which the writer operator tries to close the idle writers", + new Duration(5, SECONDS), true)); } @@ -1896,4 +1907,14 @@ public static int getPagePartitioningBufferPoolSize(Session session) { return session.getSystemProperty(PAGE_PARTITIONING_BUFFER_POOL_SIZE, Integer.class); } + + public static DataSize getIdleWriterMinDataSizeThreshold(Session session) + { + return session.getSystemProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD, DataSize.class); + } + + public static Duration getCloseIdleWritersTriggerDuration(Session session) + { + return session.getSystemProperty(CLOSE_IDLE_WRITERS_TRIGGER_DURATION, Duration.class); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java index f4c8fe43ab19..12d3a4a2a01e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java @@ -123,6 +123,7 @@ public class SqlTaskManager private final ScheduledExecutorService taskManagementExecutor; private final ScheduledExecutorService driverYieldExecutor; + private final ScheduledExecutorService driverTimeoutExecutor; private final Duration infoCacheTime; private final Duration clientTimeout; @@ -216,6 +217,7 @@ public SqlTaskManager( this.taskManagementExecutor = taskManagementExecutor.getExecutor(); this.driverYieldExecutor = newScheduledThreadPool(config.getTaskYieldThreads(), threadsNamed("task-yield-%s")); + this.driverTimeoutExecutor = newScheduledThreadPool(config.getDriverTimeoutThreads(), threadsNamed("task-driver-timeout-%s")); SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskNotificationExecutor, taskExecutor, planner, splitMonitor, tracer, config); @@ -269,6 +271,7 @@ private QueryContext createQueryContext( gcMonitor, taskNotificationExecutor, driverYieldExecutor, + driverTimeoutExecutor, maxQuerySpillPerNode, localSpillManager.getSpillSpaceTracker()); } diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index 4d584c36034a..709e2d03c861 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -100,6 +100,7 @@ public class TaskManagerConfig private int taskNotificationThreads = 5; private int taskYieldThreads = 3; + private int driverTimeoutThreads = 5; private BigDecimal levelTimeMultiplier = new BigDecimal(2.0); @@ -569,6 +570,20 @@ public TaskManagerConfig setTaskYieldThreads(int taskYieldThreads) return this; } + @Min(1) + public int getDriverTimeoutThreads() + { + return driverTimeoutThreads; + } + + @Config("task.driver-timeout-threads") + @ConfigDescription("Number of threads used for timing out blocked drivers if the timeout is set") + public TaskManagerConfig setDriverTimeoutThreads(int driverTimeoutThreads) + { + this.driverTimeoutThreads = driverTimeoutThreads; + return this; + } + public boolean isInterruptStuckSplitTasksEnabled() { return interruptStuckSplitTasksEnabled; diff --git a/core/trino-main/src/main/java/io/trino/memory/QueryContext.java b/core/trino-main/src/main/java/io/trino/memory/QueryContext.java index 571771987181..3331e60d539d 100644 --- a/core/trino-main/src/main/java/io/trino/memory/QueryContext.java +++ b/core/trino-main/src/main/java/io/trino/memory/QueryContext.java @@ -63,6 +63,7 @@ public class QueryContext private final GcMonitor gcMonitor; private final Executor notificationExecutor; private final ScheduledExecutorService yieldExecutor; + private final ScheduledExecutorService timeoutExecutor; private final long maxSpill; private final SpillSpaceTracker spillSpaceTracker; private final Map taskContexts = new ConcurrentHashMap<>(); @@ -86,6 +87,7 @@ public QueryContext( GcMonitor gcMonitor, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, + ScheduledExecutorService timeoutExecutor, DataSize maxSpill, SpillSpaceTracker spillSpaceTracker) { @@ -97,6 +99,7 @@ public QueryContext( gcMonitor, notificationExecutor, yieldExecutor, + timeoutExecutor, maxSpill, spillSpaceTracker); } @@ -109,6 +112,7 @@ public QueryContext( GcMonitor gcMonitor, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, + ScheduledExecutorService timeoutExecutor, DataSize maxSpill, SpillSpaceTracker spillSpaceTracker) { @@ -118,6 +122,7 @@ public QueryContext( this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null"); this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); + this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null"); this.maxSpill = maxSpill.toBytes(); this.spillSpaceTracker = requireNonNull(spillSpaceTracker, "spillSpaceTracker is null"); this.guaranteedMemory = guaranteedMemory; @@ -257,6 +262,7 @@ public TaskContext addTaskContext( gcMonitor, notificationExecutor, yieldExecutor, + timeoutExecutor, session, taskMemoryContext, notifyStatusChanged, diff --git a/core/trino-main/src/main/java/io/trino/operator/Driver.java b/core/trino-main/src/main/java/io/trino/operator/Driver.java index f6949c4bb204..0033344d7fe5 100644 --- a/core/trino-main/src/main/java/io/trino/operator/Driver.java +++ b/core/trino-main/src/main/java/io/trino/operator/Driver.java @@ -46,12 +46,15 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; +import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; +import static com.google.common.util.concurrent.Futures.withTimeout; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.operator.Operator.NOT_BLOCKED; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.lang.Boolean.TRUE; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; // @@ -455,6 +458,13 @@ private ListenableFuture processInternal(OperationTimer operationTimer) // unblock when the first future is complete ListenableFuture blocked = firstFinishedFuture(blockedFutures); + if (driverContext.getBlockedTimeout().isPresent()) { + blocked = withTimeout( + nonCancellationPropagating(blocked), + driverContext.getBlockedTimeout().get().toMillis(), + MILLISECONDS, + driverContext.getTimeoutExecutor()); + } // driver records serial blocked time driverContext.recordBlocked(blocked); // each blocked operator is responsible for blocking the execution diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java index 0ea5d3a5d70e..67998f0a5e45 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java @@ -28,6 +28,7 @@ import org.joda.time.DateTime; import java.util.List; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -54,6 +55,7 @@ public class DriverContext private final PipelineContext pipelineContext; private final Executor notificationExecutor; private final ScheduledExecutorService yieldExecutor; + private final ScheduledExecutorService timeoutExecutor; private final AtomicBoolean finished = new AtomicBoolean(); @@ -70,6 +72,7 @@ public class DriverContext private final AtomicReference executionStartTime = new AtomicReference<>(); private final AtomicReference executionEndTime = new AtomicReference<>(); + private final AtomicReference> blockedTimeout = new AtomicReference<>(Optional.empty()); private final MemoryTrackingContext driverMemoryContext; @@ -82,12 +85,14 @@ public DriverContext( PipelineContext pipelineContext, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, + ScheduledExecutorService timeoutExecutor, MemoryTrackingContext driverMemoryContext, long splitWeight) { this.pipelineContext = requireNonNull(pipelineContext, "pipelineContext is null"); this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); + this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null"); this.driverMemoryContext = requireNonNull(driverMemoryContext, "driverMemoryContext is null"); this.yieldSignal = new DriverYieldSignal(); this.splitWeight = splitWeight; @@ -447,6 +452,21 @@ public ScheduledExecutorService getYieldExecutor() return yieldExecutor; } + public ScheduledExecutorService getTimeoutExecutor() + { + return timeoutExecutor; + } + + public void setBlockedTimeout(Duration duration) + { + this.blockedTimeout.set(Optional.of(duration)); + } + + public Optional getBlockedTimeout() + { + return blockedTimeout.get(); + } + private static long nanosBetween(long start, long end) { return max(0, end - start); diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java index 605d970564e1..1f64fd5fb76d 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java @@ -55,6 +55,7 @@ public class PipelineContext private final TaskContext taskContext; private final Executor notificationExecutor; private final ScheduledExecutorService yieldExecutor; + private final ScheduledExecutorService timeoutExecutor; private final int pipelineId; private final boolean inputPipeline; @@ -105,7 +106,7 @@ public class PipelineContext private final MemoryTrackingContext pipelineMemoryContext; - public PipelineContext(int pipelineId, TaskContext taskContext, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, MemoryTrackingContext pipelineMemoryContext, boolean inputPipeline, boolean outputPipeline, boolean partitioned) + public PipelineContext(int pipelineId, TaskContext taskContext, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, ScheduledExecutorService timeoutExecutor, MemoryTrackingContext pipelineMemoryContext, boolean inputPipeline, boolean outputPipeline, boolean partitioned) { this.pipelineId = pipelineId; this.inputPipeline = inputPipeline; @@ -114,6 +115,7 @@ public PipelineContext(int pipelineId, TaskContext taskContext, Executor notific this.taskContext = requireNonNull(taskContext, "taskContext is null"); this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); + this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null"); this.pipelineMemoryContext = requireNonNull(pipelineMemoryContext, "pipelineMemoryContext is null"); // Initialize the local memory contexts with the ExchangeOperator tag as ExchangeOperator will do the local memory allocations pipelineMemoryContext.initializeLocalMemoryContexts(ExchangeOperator.class.getSimpleName()); @@ -156,6 +158,7 @@ public DriverContext addDriverContext(long splitWeight) this, notificationExecutor, yieldExecutor, + timeoutExecutor, pipelineMemoryContext.newMemoryTrackingContext(), splitWeight); drivers.add(driverContext); diff --git a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java index 37619858e1e4..ac4efe5b6307 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java @@ -20,7 +20,9 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.log.Logger; import io.airlift.slice.Slice; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; import io.trino.memory.context.LocalMemoryContext; @@ -42,6 +44,7 @@ import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -53,6 +56,8 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; +import static io.trino.SystemSessionProperties.getCloseIdleWritersTriggerDuration; +import static io.trino.SystemSessionProperties.getIdleWriterMinDataSizeThreshold; import static io.trino.SystemSessionProperties.isStatisticsCpuTimerEnabled; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarbinaryType.VARBINARY; @@ -64,6 +69,7 @@ public class TableWriterOperator implements Operator { + private static final Logger LOG = Logger.get(TableWriterOperator.class); public static final int ROW_COUNT_CHANNEL = 0; public static final int FRAGMENT_CHANNEL = 1; public static final int STATS_START_CHANNEL = 2; @@ -111,10 +117,22 @@ public TableWriterOperatorFactory( public Operator createOperator(DriverContext driverContext) { checkState(!closed, "Factory is already closed"); + // Driver should call getOutput() periodically on TableWriterOperator to close idle writers which will essentially + // decrease the memory usage even if no pages were added to that writer thread. + if (getCloseIdleWritersTriggerDuration(session).toMillis() > 0) { + driverContext.setBlockedTimeout(getCloseIdleWritersTriggerDuration(session)); + } OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableWriterOperator.class.getSimpleName()); Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext); boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session); - return new TableWriterOperator(context, createPageSink(driverContext), columnChannels, statisticsAggregationOperator, types, statisticsCpuTimerEnabled); + return new TableWriterOperator( + context, + createPageSink(driverContext), + columnChannels, + statisticsAggregationOperator, + types, + statisticsCpuTimerEnabled, + getIdleWriterMinDataSizeThreshold(session)); } private ConnectorPageSink createPageSink(DriverContext driverContext) @@ -159,6 +177,7 @@ private enum State private final AtomicLong pageSinkPeakMemoryUsage = new AtomicLong(); private final Operator statisticAggregationOperator; private final List types; + private final DataSize idleWriterMinDataSizeThreshold; private ListenableFuture blocked = NOT_BLOCKED; private CompletableFuture> finishFuture; @@ -170,8 +189,10 @@ private enum State private final OperationTiming statisticsTiming = new OperationTiming(); private final boolean statisticsCpuTimerEnabled; - private final Supplier tableWriterInfoSupplier; + // This records the last physical written data size when connector closeIdleWriters is triggered. + private long lastPhysicalWrittenDataSize; + private boolean newPagesAdded; public TableWriterOperator( OperatorContext operatorContext, @@ -179,7 +200,8 @@ public TableWriterOperator( List columnChannels, Operator statisticAggregationOperator, List types, - boolean statisticsCpuTimerEnabled) + boolean statisticsCpuTimerEnabled, + DataSize idleWriterMinDataSizeThreshold) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.pageSinkMemoryContext = operatorContext.newLocalUserMemoryContext(TableWriterOperator.class.getSimpleName()); @@ -188,6 +210,7 @@ public TableWriterOperator( this.statisticAggregationOperator = requireNonNull(statisticAggregationOperator, "statisticAggregationOperator is null"); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled; + this.idleWriterMinDataSizeThreshold = requireNonNull(idleWriterMinDataSizeThreshold, "idleWriterMinDataSizeThreshold is null"); this.tableWriterInfoSupplier = createTableWriterInfoSupplier(pageSinkPeakMemoryUsage, statisticsTiming, pageSink); this.operatorContext.setInfoSupplier(tableWriterInfoSupplier); } @@ -259,14 +282,20 @@ public void addInput(Page page) rowCount += page.getPositionCount(); updateWrittenBytes(); operatorContext.recordWriterInputDataSize(page.getSizeInBytes()); + newPagesAdded = true; } @Override public Page getOutput() { - if (!blocked.isDone()) { + tryClosingIdleWriters(); + // This method could be called even when new pages have not been added. In that case, we don't have to + // try to get the output from the aggregation operator. It could be expensive since getOutput() is + // called quite frequently. + if (!(blocked.isDone() && (newPagesAdded || state != State.RUNNING))) { return null; } + newPagesAdded = false; if (!statisticAggregationOperator.isFinished()) { OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled); @@ -365,6 +394,24 @@ private void updateWrittenBytes() writtenBytes = current; } + private void tryClosingIdleWriters() + { + long physicalWrittenDataSize = getTaskContext().getPhysicalWrittenDataSize(); + Optional writerCount = getTaskContext().getMaxWriterCount(); + if (writerCount.isEmpty() || physicalWrittenDataSize - lastPhysicalWrittenDataSize <= idleWriterMinDataSizeThreshold.toBytes() * writerCount.get()) { + return; + } + pageSink.closeIdleWriters(); + updateMemoryUsage(); + updateWrittenBytes(); + lastPhysicalWrittenDataSize = physicalWrittenDataSize; + } + + private TaskContext getTaskContext() + { + return operatorContext.getDriverContext().getPipelineContext().getTaskContext(); + } + private void updateMemoryUsage() { long pageSinkMemoryUsage = pageSink.getMemoryUsage(); diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index 3cc4a93e0b2a..e72670b2f4c6 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -69,6 +69,7 @@ public class TaskContext private final GcMonitor gcMonitor; private final Executor notificationExecutor; private final ScheduledExecutorService yieldExecutor; + private final ScheduledExecutorService timeoutExecutor; private final Session session; private final long createNanos = System.nanoTime(); @@ -117,6 +118,7 @@ public static TaskContext createTaskContext( GcMonitor gcMonitor, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, + ScheduledExecutorService timeoutExecutor, Session session, MemoryTrackingContext taskMemoryContext, Runnable notifyStatusChanged, @@ -129,6 +131,7 @@ public static TaskContext createTaskContext( gcMonitor, notificationExecutor, yieldExecutor, + timeoutExecutor, session, taskMemoryContext, notifyStatusChanged, @@ -144,6 +147,7 @@ private TaskContext( GcMonitor gcMonitor, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, + ScheduledExecutorService timeoutExecutor, Session session, MemoryTrackingContext taskMemoryContext, Runnable notifyStatusChanged, @@ -155,6 +159,7 @@ private TaskContext( this.queryContext = requireNonNull(queryContext, "queryContext is null"); this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); + this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null"); this.session = session; this.taskMemoryContext = requireNonNull(taskMemoryContext, "taskMemoryContext is null"); @@ -186,6 +191,7 @@ public PipelineContext addPipelineContext(int pipelineId, boolean inputPipeline, this, notificationExecutor, yieldExecutor, + timeoutExecutor, taskMemoryContext.newMemoryTrackingContext(), inputPipeline, outputPipeline, diff --git a/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java b/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java index 04d4fb8d0090..f5cb6160b271 100644 --- a/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java +++ b/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java @@ -39,21 +39,21 @@ public final class TestingTaskContext private TestingTaskContext() {} - public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session) + public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService scheduledExecutor, Session session) { - return builder(notificationExecutor, yieldExecutor, session).build(); + return builder(notificationExecutor, scheduledExecutor, session).build(); } - public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session, DataSize maxMemory) + public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService scheduledExecutor, Session session, DataSize maxMemory) { - return builder(notificationExecutor, yieldExecutor, session) + return builder(notificationExecutor, scheduledExecutor, session) .setQueryMaxMemory(maxMemory) .build(); } - public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session, TaskStateMachine taskStateMachine) + public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService scheduledExecutor, Session session, TaskStateMachine taskStateMachine) { - return builder(notificationExecutor, yieldExecutor, session) + return builder(notificationExecutor, scheduledExecutor, session) .setTaskStateMachine(taskStateMachine) .build(); } @@ -73,15 +73,15 @@ private static TaskContext createTaskContext(QueryContext queryContext, Session true); } - public static Builder builder(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session) + public static Builder builder(Executor notificationExecutor, ScheduledExecutorService scheduledExecutor, Session session) { - return new Builder(notificationExecutor, yieldExecutor, session); + return new Builder(notificationExecutor, scheduledExecutor, session); } public static class Builder { private final Executor notificationExecutor; - private final ScheduledExecutorService yieldExecutor; + private final ScheduledExecutorService scheduledExecutor; private final Session session; private QueryId queryId = new QueryId("test_query"); private TaskStateMachine taskStateMachine; @@ -90,10 +90,10 @@ public static class Builder private DataSize maxSpillSize = DataSize.of(1, GIGABYTE); private DataSize queryMaxSpillSize = DataSize.of(1, GIGABYTE); - private Builder(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session) + private Builder(Executor notificationExecutor, ScheduledExecutorService scheduledExecutor, Session session) { this.notificationExecutor = notificationExecutor; - this.yieldExecutor = yieldExecutor; + this.scheduledExecutor = scheduledExecutor; this.session = session; } @@ -148,7 +148,8 @@ public TaskContext build() 0L, GC_MONITOR, notificationExecutor, - yieldExecutor, + scheduledExecutor, + scheduledExecutor, queryMaxSpillSize, spillSpaceTracker); diff --git a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java index 681afda39efe..b0fde885e1b1 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java @@ -212,6 +212,7 @@ public MockRemoteTask( new TestingGcMonitor(), executor, scheduledExecutor, + scheduledExecutor, DataSize.of(1, MEGABYTE), spillSpaceTracker); this.taskContext = queryContext.addTaskContext(taskStateMachine, TEST_SESSION, () -> {}, true, true); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java index c5855316582e..1c9d4bbf5680 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java @@ -292,6 +292,7 @@ private QueryContext getOrCreateQueryContext(QueryId queryId) new TestingGcMonitor(), executor, scheduledExecutor, + scheduledExecutor, DataSize.of(1, GIGABYTE), spillSpaceTracker)); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java index d865468401e4..48e4ea9d6090 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java @@ -92,6 +92,7 @@ public class TestSqlTask private TaskExecutor taskExecutor; private ScheduledExecutorService taskNotificationExecutor; private ScheduledExecutorService driverYieldExecutor; + private ScheduledExecutorService driverTimeoutExecutor; private SqlTaskExecutionFactory sqlTaskExecutionFactory; private final AtomicInteger nextTaskId = new AtomicInteger(); @@ -104,7 +105,7 @@ public void setUp() taskNotificationExecutor = newScheduledThreadPool(10, threadsNamed("task-notification-%s")); driverYieldExecutor = newScheduledThreadPool(2, threadsNamed("driver-yield-%s")); - + driverTimeoutExecutor = newScheduledThreadPool(2, threadsNamed("driver-timeout-%s")); LocalExecutionPlanner planner = createTestingPlanner(); sqlTaskExecutionFactory = new SqlTaskExecutionFactory( @@ -123,6 +124,7 @@ public void destroy() taskExecutor = null; taskNotificationExecutor.shutdownNow(); driverYieldExecutor.shutdown(); + driverTimeoutExecutor.shutdown(); sqlTaskExecutionFactory = null; } @@ -435,6 +437,7 @@ private SqlTask createInitialTask() new TestingGcMonitor(), taskNotificationExecutor, driverYieldExecutor, + driverTimeoutExecutor, DataSize.of(1, MEGABYTE), new SpillSpaceTracker(DataSize.of(1, GIGABYTE))); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index 282c65442b80..aae9ad8661ac 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -98,6 +98,7 @@ public void testSimple() { ScheduledExecutorService taskNotificationExecutor = newScheduledThreadPool(10, threadsNamed("task-notification-%s")); ScheduledExecutorService driverYieldExecutor = newScheduledThreadPool(2, threadsNamed("driver-yield-%s")); + ScheduledExecutorService driverTimeoutExecutor = newScheduledThreadPool(2, threadsNamed("driver-timeout-%s")); TaskExecutor taskExecutor = new TimeSharingTaskExecutor(5, 10, 3, 4, Ticker.systemTicker()); taskExecutor.start(); @@ -131,7 +132,7 @@ public void testSimple() ImmutableList.of(testingScanOperatorFactory, taskOutputOperatorFactory), OptionalInt.empty())), ImmutableList.of(TABLE_SCAN_NODE_ID)); - TaskContext taskContext = newTestingTaskContext(taskNotificationExecutor, driverYieldExecutor, taskStateMachine); + TaskContext taskContext = newTestingTaskContext(taskNotificationExecutor, driverYieldExecutor, driverTimeoutExecutor, taskStateMachine); SqlTaskExecution sqlTaskExecution = new SqlTaskExecution( taskStateMachine, taskContext, @@ -197,10 +198,11 @@ public void testSimple() taskExecutor.stop(); taskNotificationExecutor.shutdownNow(); driverYieldExecutor.shutdown(); + driverTimeoutExecutor.shutdown(); } } - private TaskContext newTestingTaskContext(ScheduledExecutorService taskNotificationExecutor, ScheduledExecutorService driverYieldExecutor, TaskStateMachine taskStateMachine) + private TaskContext newTestingTaskContext(ScheduledExecutorService taskNotificationExecutor, ScheduledExecutorService driverYieldExecutor, ScheduledExecutorService driverTimeoutExecutor, TaskStateMachine taskStateMachine) { QueryContext queryContext = new QueryContext( new QueryId("queryid"), @@ -209,6 +211,7 @@ private TaskContext newTestingTaskContext(ScheduledExecutorService taskNotificat new TestingGcMonitor(), taskNotificationExecutor, driverYieldExecutor, + driverTimeoutExecutor, DataSize.of(1, MEGABYTE), new SpillSpaceTracker(DataSize.of(1, GIGABYTE))); return queryContext.addTaskContext(taskStateMachine, TEST_SESSION, () -> {}, false, false); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 73685aa8f09b..965ee6e277ce 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -71,6 +71,7 @@ public void testDefaults() .setHttpTimeoutThreads(3) .setTaskNotificationThreads(5) .setTaskYieldThreads(3) + .setDriverTimeoutThreads(5) .setLevelTimeMultiplier(new BigDecimal("2")) .setStatisticsCpuTimerEnabled(true) .setInterruptStuckSplitTasksEnabled(true) @@ -116,6 +117,7 @@ public void testExplicitPropertyMappings() .put("task.http-timeout-threads", "10") .put("task.task-notification-threads", "13") .put("task.task-yield-threads", "8") + .put("task.driver-timeout-threads", "10") .put("task.level-time-multiplier", "2.1") .put("task.statistics-cpu-timer-enabled", "false") .put("task.interrupt-stuck-split-tasks-enabled", "false") @@ -156,6 +158,7 @@ public void testExplicitPropertyMappings() .setHttpTimeoutThreads(10) .setTaskNotificationThreads(13) .setTaskYieldThreads(8) + .setDriverTimeoutThreads(10) .setLevelTimeMultiplier(new BigDecimal("2.1")) .setStatisticsCpuTimerEnabled(false) .setInterruptStuckSplitTasksEnabled(false) diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java index fde67f51b1dd..0b2b3cb1f049 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java @@ -97,6 +97,7 @@ private void setUp(Supplier> driversSupplier) new TestingGcMonitor(), localQueryRunner.getExecutor(), localQueryRunner.getScheduler(), + localQueryRunner.getScheduler(), TEN_MEGABYTES, spillSpaceTracker); taskContext = createTaskContext(queryContext, localQueryRunner.getExecutor(), localQueryRunner.getDefaultSession()); diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java index 6069671e6611..4f39206b198b 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java @@ -68,11 +68,13 @@ public class TestMemoryTracking private MemoryPool memoryPool; private ExecutorService notificationExecutor; private ScheduledExecutorService yieldExecutor; + private ScheduledExecutorService timeoutExecutor; @AfterEach public void tearDown() { notificationExecutor.shutdownNow(); + timeoutExecutor.shutdownNow(); yieldExecutor.shutdownNow(); queryContext = null; taskContext = null; @@ -87,6 +89,7 @@ public void setUpTest() { notificationExecutor = newCachedThreadPool(daemonThreadsNamed("local-query-runner-executor-%s")); yieldExecutor = newScheduledThreadPool(2, daemonThreadsNamed("local-query-runner-scheduler-%s")); + timeoutExecutor = newScheduledThreadPool(2, daemonThreadsNamed("local-query-runner-driver-timeout-%s")); memoryPool = new MemoryPool(memoryPoolSize); queryContext = new QueryContext( @@ -96,6 +99,7 @@ public void setUpTest() new TestingGcMonitor(), notificationExecutor, yieldExecutor, + timeoutExecutor, queryMaxSpillSize, spillSpaceTracker); taskContext = queryContext.addTaskContext( diff --git a/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java b/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java index 298ca2f30931..a02624fb6a38 100644 --- a/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java +++ b/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java @@ -87,6 +87,7 @@ public static GroupByHashYieldResult finishOperatorWithYieldingGroupByHash(List< new TestingGcMonitor(), EXECUTOR, SCHEDULED_EXECUTOR, + SCHEDULED_EXECUTOR, DataSize.of(512, MEGABYTE), new SpillSpaceTracker(DataSize.of(512, MEGABYTE))); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java index d76f0ad55f5a..8b3313b2c0d0 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java @@ -62,8 +62,10 @@ import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; import static io.trino.testing.TestingHandles.TEST_TABLE_HANDLE; import static io.trino.testing.TestingTaskContext.createTaskContext; +import static java.lang.Thread.sleep; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD; @@ -285,6 +287,53 @@ public void testUnblocksOnFinish() assertThat(blocked.isDone()).isTrue(); } + @Test + public void testUnblocksOnTimeout() + throws InterruptedException + { + List types = ImmutableList.of(VARCHAR, BIGINT, BIGINT); + driverContext.setBlockedTimeout(new Duration(70, MILLISECONDS)); + // Create driver with 3 operators, one of which is blocked such that it will not move any page and + // return a blocked timeout future. + Operator operator1 = createSinkOperator(types, 1, "test1"); + BlockedOperator operator2 = createBlockedOperator(types, 2, "test2"); + Operator operator3 = createSinkOperator(types, 3, "test3"); + Operator operator4 = createSinkOperator(types, 4, "test3"); + Driver driver = Driver.createDriver(driverContext, operator1, operator2, operator3, operator4); + + ListenableFuture blocked = driver.processForDuration(new Duration(200, MILLISECONDS)); + assertThat(blocked.isDone()).isFalse(); + // wait for the blocked future to be timed out + sleep(100); + assertThat(blocked.isDone()).isTrue(); + // verify that the blocked operator is not cancelled or done due to timeout + assertThat(operator2.isCancelled()).isFalse(); + assertThat(operator2.isDone()).isFalse(); + } + + @Test + public void testUnblocksWhenBlockedOperatorIsUnblockedAndTimeoutIsSet() + { + List types = ImmutableList.of(VARCHAR, BIGINT, BIGINT); + driverContext.setBlockedTimeout(new Duration(100, MILLISECONDS)); + // Create driver with 3 operators, one of which is blocked such that it will not move any page and + // return a blocked timeout future. + Operator operator1 = createSinkOperator(types, 1, "test1"); + BlockedOperator operator2 = createBlockedOperator(types, 2, "test2"); + Operator operator3 = createSinkOperator(types, 3, "test3"); + Operator operator4 = createSinkOperator(types, 4, "test3"); + Driver driver = Driver.createDriver(driverContext, operator1, operator2, operator3, operator4); + + ListenableFuture blocked = driver.processForDuration(new Duration(200, MILLISECONDS)); + assertThat(blocked.isDone()).isFalse(); + // unblock the blocked operator + operator2.setDone(); + // verify that the blocked future is done but is not cancelled + assertThat(operator2.isDone()).isTrue(); + assertThat(blocked.isDone()).isTrue(); + assertThat(operator2.isCancelled()).isFalse(); + } + @Test public void testBrokenOperatorAddSource() { @@ -336,10 +385,22 @@ private static Split newMockSplit() } private PageConsumerOperator createSinkOperator(List types) + { + return createSinkOperator(types, 1, "test"); + } + + private PageConsumerOperator createSinkOperator(List types, int operatorId, String planNodeId) + { + // materialize the output to catch some type errors + MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder(driverContext.getSession(), types); + return new PageConsumerOperator(driverContext.addOperatorContext(operatorId, new PlanNodeId(planNodeId), "sink"), resultBuilder::page, Function.identity()); + } + + private BlockedOperator createBlockedOperator(List types, int operatorId, String planNodeId) { // materialize the output to catch some type errors MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder(driverContext.getSession(), types); - return new PageConsumerOperator(driverContext.addOperatorContext(1, new PlanNodeId("test"), "sink"), resultBuilder::page, Function.identity()); + return new BlockedOperator(driverContext.addOperatorContext(operatorId, new PlanNodeId(planNodeId), "sink"), resultBuilder::page, Function.identity()); } private static class BrokenOperator @@ -477,6 +538,41 @@ void setFinished() } } + private static class BlockedOperator + extends PageConsumerOperator + { + private final SettableFuture blocked = SettableFuture.create(); + + public BlockedOperator( + OperatorContext operatorContext, + Consumer pageConsumer, + Function pagePreprocessor) + { + super(operatorContext, pageConsumer, pagePreprocessor); + } + + @Override + public ListenableFuture isBlocked() + { + return blocked; + } + + private void setDone() + { + blocked.set(null); + } + + private boolean isDone() + { + return blocked.isDone(); + } + + private boolean isCancelled() + { + return blocked.isCancelled(); + } + } + private static class AlwaysBlockedTableScanOperator extends TableScanOperator { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestingOperatorContext.java b/core/trino-main/src/test/java/io/trino/operator/TestingOperatorContext.java index db2930852237..677e71e6540c 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestingOperatorContext.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestingOperatorContext.java @@ -42,6 +42,7 @@ public static OperatorContext create(ScheduledExecutorService scheduledExecutor) taskContext, executor, scheduledExecutor, + scheduledExecutor, pipelineMemoryContext, false, false, @@ -51,6 +52,7 @@ public static OperatorContext create(ScheduledExecutorService scheduledExecutor) pipelineContext, executor, scheduledExecutor, + scheduledExecutor, pipelineMemoryContext, 0L); diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSink.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSink.java index b7fc9e2897ad..952eef2aece4 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSink.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSink.java @@ -61,6 +61,14 @@ default long getValidationCpuNanos() */ CompletableFuture appendPage(Page page); + /** + * Closes the idle partition writers that have not received any data since the last time this + * method is called. This method is called periodically based on some + * data written threshold by the TableWriterOperator. It is needed to avoid high memory + * usage due to stale partitions kept in memory during partitioned writes. + */ + default void closeIdleWriters() {} + /** * Notifies the connector that no more pages will be appended and returns * connector-specific information that will be sent to the coordinator to diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java index 6b399d2d40de..a4f76d74b3d2 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java @@ -69,6 +69,14 @@ public CompletableFuture appendPage(Page page) } } + @Override + public void closeIdleWriters() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.closeIdleWriters(); + } + } + @Override public CompletableFuture> finish() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java index b4f6c3e86c4b..e5b2e8aef0b0 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java @@ -100,11 +100,12 @@ public abstract class AbstractDeltaLakePageSink private final DeltaLakeWriterStats stats; private final String trinoVersion; private final long targetMaxFileSize; - + private final long idleWriterMinFileSize; private long writtenBytes; private long memoryUsage; private final List closedWriterRollbackActions = new ArrayList<>(); + private final List activeWriters = new ArrayList<>(); protected final ImmutableList.Builder dataFileInfos = ImmutableList.builder(); private final DeltaLakeParquetSchemaMapping parquetSchemaMapping; @@ -190,6 +191,7 @@ public AbstractDeltaLakePageSink( this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.targetMaxFileSize = DeltaLakeSessionProperties.getTargetMaxFileSize(session); + this.idleWriterMinFileSize = DeltaLakeSessionProperties.getIdleWriterMinFileSize(session); } protected abstract void processSynthesizedColumn(DeltaLakeColumnHandle column); @@ -312,6 +314,7 @@ private void writePage(Page page) } DeltaLakeWriter writer = writers.get(index); + verify(writer != null, "Expected writer at index %s", index); long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); @@ -320,6 +323,22 @@ private void writePage(Page page) writtenBytes += writer.getWrittenBytes() - currentWritten; memoryUsage += writer.getMemoryUsage() - currentMemory; + // Mark this writer as active (i.e. not idle) + activeWriters.set(index, true); + } + } + + @Override + public void closeIdleWriters() + { + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + DeltaLakeWriter writer = writers.get(writerIndex); + if (activeWriters.get(writerIndex) || writer == null || writer.getWrittenBytes() <= idleWriterMinFileSize) { + activeWriters.set(writerIndex, false); + continue; + } + LOG.debug("Closing writer %s with %s bytes written", writerIndex, writer.getWrittenBytes()); + closeWriter(writerIndex); } } @@ -334,6 +353,7 @@ private int[] getWriterIndexes(Page page) // expand writers list to new size while (writers.size() <= pageIndexer.getMaxIndex()) { writers.add(null); + activeWriters.add(false); } // create missing writers for (int position = 0; position < page.getPositionCount(); position++) { @@ -374,7 +394,6 @@ private int[] getWriterIndexes(Page page) memoryUsage += writer.getMemoryUsage(); } verify(writers.size() == pageIndexer.getMaxIndex() + 1); - verify(!writers.contains(null)); return writerIndexes; } @@ -387,6 +406,9 @@ private String getRelativeFilePath(Optional partitionName, String fileNa protected void closeWriter(int writerIndex) { DeltaLakeWriter writer = writers.get(writerIndex); + if (writer == null) { + return; + } long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 87ff4190d60f..020077cc1059 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -76,6 +76,7 @@ public class DeltaLakeConfig private boolean deleteSchemaLocationsFallback; private String parquetTimeZone = TimeZone.getDefault().getID(); private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); + private DataSize idleWriterMinFileSize = DataSize.of(16, MEGABYTE); private boolean uniqueTableLocation = true; private boolean registerTableProcedureEnabled; private boolean projectionPushdownEnabled = true; @@ -450,6 +451,20 @@ public DeltaLakeConfig setTargetMaxFileSize(DataSize targetMaxFileSize) return this; } + @NotNull + public DataSize getIdleWriterMinFileSize() + { + return idleWriterMinFileSize; + } + + @Config("delta.idle-writer-min-file-size") + @ConfigDescription("Minimum data written by a single partition writer before it can be consider as 'idle' and could be closed by the engine") + public DeltaLakeConfig setIdleWriterMinFileSize(DataSize idleWriterMinFileSize) + { + this.idleWriterMinFileSize = idleWriterMinFileSize; + return this; + } + public boolean isUniqueTableLocation() { return uniqueTableLocation; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 8cfb02a3988b..707d3aad66bd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -59,6 +59,7 @@ public final class DeltaLakeSessionProperties private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; + private static final String IDLE_WRITER_MIN_FILE_SIZE = "idle_writer_min_file_size"; private static final String COMPRESSION_CODEC = "compression_codec"; // This property is not supported by Delta Lake and exists solely for technical reasons. @Deprecated @@ -150,6 +151,11 @@ public DeltaLakeSessionProperties( "Target maximum size of written files; the actual size may be larger", deltaLakeConfig.getTargetMaxFileSize(), false), + dataSizeProperty( + IDLE_WRITER_MIN_FILE_SIZE, + "Minimum data written by a single partition writer before it can be consider as 'idle' and could be closed by the engine", + deltaLakeConfig.getIdleWriterMinFileSize(), + false), enumProperty( TIMESTAMP_PRECISION, "Internal Delta Lake connector property", @@ -266,6 +272,11 @@ public static long getTargetMaxFileSize(ConnectorSession session) return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes(); } + public static long getIdleWriterMinFileSize(ConnectorSession session) + { + return session.getProperty(IDLE_WRITER_MIN_FILE_SIZE, DataSize.class).toBytes(); + } + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) { return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestCloseIdleWriters.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestCloseIdleWriters.java new file mode 100644 index 000000000000..865bdf9d55eb --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestCloseIdleWriters.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.trino.SystemSessionProperties.IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD; +import static io.trino.SystemSessionProperties.SCALE_WRITERS; +import static io.trino.SystemSessionProperties.TASK_MAX_WRITER_COUNT; +import static io.trino.SystemSessionProperties.TASK_MIN_WRITER_COUNT; +import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCloseIdleWriters + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Path metastoreDirectory = Files.createTempDirectory(DELTA_CATALOG); + metastoreDirectory.toFile().deleteOnExit(); + DistributedQueryRunner queryRunner = DeltaLakeQueryRunner.builder() + .setCatalogName(DELTA_CATALOG) + .setNodeCount(1) + // Set the target max file size to 100GB so that we don't close writers due to file size in append + // page. + .setDeltaProperties(ImmutableMap.of( + "hive.metastore", "file", + "hive.metastore.catalog.dir", metastoreDirectory.toUri().toString(), + "delta.target-max-file-size", "100GB", + "delta.idle-writer-min-file-size", "0.1MB")) + .build(); + queryRunner.execute("CREATE SCHEMA IF NOT EXISTS tpch"); + return queryRunner; + } + + @Test + public void testCloseIdleWriters() + { + String tableName = "task_close_idle_writers_" + randomNameSuffix(); + try { + // Create a table with two partitions (0 and 1). Using the order by trick we will write the partitions in + // this order 0, 1, and then again 0. This way we are sure that during partition 1 write there will + // be an idle writer for partition 0. Additionally, during second partition 0 write, there will be an idle + // writer for partition 1. + @Language("SQL") String createTableSql = """ + CREATE TABLE %s WITH (partitioned_by = ARRAY['shipmodeVal']) + AS SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, + discount, tax, returnflag, linestatus, commitdate, receiptdate, shipinstruct, + comment, shipdate, + CASE + WHEN shipmode IN ('AIR', 'FOB', 'SHIP', 'TRUCK') THEN 0 + WHEN shipmode IN ('MAIL', 'RAIL', 'REG AIR') THEN 1 + ELSE 2 + END AS shipmodeVal + FROM tpch.tiny.lineitem + ORDER BY shipmode + LIMIT 60174 + """.formatted(tableName); + + // Disable all kind of scaling and set idle writer threshold to 10MB + assertUpdate( + Session.builder(getSession()) + .setSystemProperty(SCALE_WRITERS, "false") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") + .setSystemProperty(TASK_MAX_WRITER_COUNT, "1") + .setSystemProperty(TASK_MIN_WRITER_COUNT, "1") + .setSystemProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD, "0.1MB") + .build(), + createTableSql, + 60174); + long files = (long) computeScalar("SELECT count(DISTINCT \"$path\") FROM " + tableName); + // There should more than 2 files since we triggered close idle writers. + assertThat(files).isGreaterThan(2); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 4a6306dc524f..7c5bdb6a22f9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -27,6 +27,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.HOURS; @@ -66,6 +67,7 @@ public void testDefaults() .setParquetTimeZone(TimeZone.getDefault().getID()) .setPerTransactionMetastoreCacheMaximumSize(1000) .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) + .setIdleWriterMinFileSize(DataSize.of(16, MEGABYTE)) .setUniqueTableLocation(true) .setRegisterTableProcedureEnabled(false) .setProjectionPushdownEnabled(true) @@ -103,6 +105,7 @@ public void testExplicitPropertyMappings() .put("delta.delete-schema-locations-fallback", "true") .put("delta.parquet.time-zone", nonDefaultTimeZone().getID()) .put("delta.target-max-file-size", "2 GB") + .put("delta.idle-writer-min-file-size", "1MB") .put("delta.unique-table-location", "false") .put("delta.register-table-procedure.enabled", "true") .put("delta.projection-pushdown-enabled", "false") @@ -137,6 +140,7 @@ public void testExplicitPropertyMappings() .setParquetTimeZone(nonDefaultTimeZone().getID()) .setPerTransactionMetastoreCacheMaximumSize(500) .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)) + .setIdleWriterMinFileSize(DataSize.of(1, MEGABYTE)) .setUniqueTableLocation(false) .setRegisterTableProcedureEnabled(true) .setProjectionPushdownEnabled(false) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 0f54cc3af1ec..1f25d05def31 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -168,6 +168,7 @@ public class HiveConfig private Optional hudiCatalogName = Optional.empty(); private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); + private DataSize idleWriterMinFileSize = DataSize.of(16, MEGABYTE); private boolean sizeBasedSplitWeightsEnabled = true; private double minimumAssignedSplitWeight = 0.05; @@ -270,6 +271,19 @@ public HiveConfig setTargetMaxFileSize(DataSize targetMaxFileSize) return this; } + public DataSize getIdleWriterMinFileSize() + { + return idleWriterMinFileSize; + } + + @Config("hive.idle-writer-min-file-size") + @ConfigDescription("Minimum data written by a single partition writer before it can be consider as 'idle' and could be closed by the engine") + public HiveConfig setIdleWriterMinFileSize(DataSize idleWriterMinFileSize) + { + this.idleWriterMinFileSize = idleWriterMinFileSize; + return this; + } + public boolean isForceLocalScheduling() { return forceLocalScheduling; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index c703ea951feb..f1e688fb7143 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion; import io.trino.spi.Page; @@ -64,6 +65,7 @@ public class HivePageSink implements ConnectorPageSink, ConnectorMergeSink { + private static final Logger LOG = Logger.get(HivePageSink.class); private static final int MAX_PAGE_POSITIONS = 4096; private final HiveWriterFactory writerFactory; @@ -85,9 +87,11 @@ public class HivePageSink private final List writers = new ArrayList<>(); private final long targetMaxFileSize; + private final long idleWriterMinFileSize; private final List closedWriterRollbackActions = new ArrayList<>(); private final List partitionUpdates = new ArrayList<>(); private final List> verificationTasks = new ArrayList<>(); + private final List activeWriters = new ArrayList<>(); private final boolean isMergeSink; private long writtenBytes; @@ -161,6 +165,7 @@ public HivePageSink( } this.targetMaxFileSize = HiveSessionProperties.getTargetMaxFileSize(session).toBytes(); + this.idleWriterMinFileSize = HiveSessionProperties.getIdleWriterMinFileSize(session).toBytes(); } @Override @@ -191,6 +196,9 @@ private ListenableFuture> doMergeSinkFinish() { ImmutableList.Builder resultSlices = ImmutableList.builder(); for (HiveWriter writer : writers) { + if (writer == null) { + continue; + } writer.commit(); MergeFileWriter mergeFileWriter = (MergeFileWriter) writer.getFileWriter(); PartitionUpdateAndMergeResults results = mergeFileWriter.getPartitionUpdateAndMergeResults(writer.getPartitionUpdate()); @@ -198,6 +206,7 @@ private ListenableFuture> doMergeSinkFinish() } List result = resultSlices.build(); writtenBytes = writers.stream() + .filter(Objects::nonNull) .mapToLong(HiveWriter::getWrittenBytes) .sum(); return Futures.immediateFuture(result); @@ -308,6 +317,7 @@ private void writePage(Page page) } HiveWriter writer = writers.get(index); + verify(writer != null, "Expected writer at index %s", index); long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); @@ -316,12 +326,17 @@ private void writePage(Page page) writtenBytes += (writer.getWrittenBytes() - currentWritten); memoryUsage += (writer.getMemoryUsage() - currentMemory); + // Mark this writer as active (i.e. not idle) + activeWriters.set(index, true); } } private void closeWriter(int writerIndex) { HiveWriter writer = writers.get(writerIndex); + if (writer == null) { + return; + } long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); @@ -338,6 +353,26 @@ private void closeWriter(int writerIndex) partitionUpdates.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdate))); } + @Override + public void closeIdleWriters() + { + // For transactional tables we don't want to split output files because there is an explicit or implicit bucketing + // and file names have no random component (e.g. bucket_00000) + if (bucketFunction != null || isTransactional) { + return; + } + + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + HiveWriter writer = writers.get(writerIndex); + if (activeWriters.get(writerIndex) || writer == null || writer.getWrittenBytes() <= idleWriterMinFileSize) { + activeWriters.set(writerIndex, false); + continue; + } + LOG.debug("Closing writer %s with %s bytes written", writerIndex, writer.getWrittenBytes()); + closeWriter(writerIndex); + } + } + private int[] getWriterIndexes(Page page) { Page partitionColumns = extractColumns(page, partitionColumnsInputIndex); @@ -350,6 +385,7 @@ private int[] getWriterIndexes(Page page) // expand writers list to new size while (writers.size() <= pagePartitioner.getMaxIndex()) { writers.add(null); + activeWriters.add(false); } // create missing writers @@ -378,7 +414,6 @@ private int[] getWriterIndexes(Page page) memoryUsage += writer.getMemoryUsage(); } verify(writers.size() == pagePartitioner.getMaxIndex() + 1); - verify(!writers.contains(null)); return writerIndexes; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index fbeb33a40a5c..c0a71c1fe995 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -63,6 +63,7 @@ public final class HiveSessionProperties private static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled"; private static final String VALIDATE_BUCKETING = "validate_bucketing"; private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; + private static final String IDLE_WRITER_MIN_FILE_SIZE = "idle_writer_min_file_size"; private static final String PARALLEL_PARTITIONED_BUCKETED_WRITES = "parallel_partitioned_bucketed_writes"; private static final String FORCE_LOCAL_SCHEDULING = "force_local_scheduling"; private static final String INSERT_EXISTING_PARTITIONS_BEHAVIOR = "insert_existing_partitions_behavior"; @@ -169,6 +170,11 @@ public HiveSessionProperties( "Target maximum size of written files; the actual size may be larger", hiveConfig.getTargetMaxFileSize(), false), + dataSizeProperty( + IDLE_WRITER_MIN_FILE_SIZE, + "Minimum data written by a single partition writer before it can be consider as 'idle' and could be closed by the engine", + hiveConfig.getIdleWriterMinFileSize(), + false), booleanProperty( PARALLEL_PARTITIONED_BUCKETED_WRITES, "Improve parallelism of partitioned and bucketed table writes", @@ -555,6 +561,11 @@ public static DataSize getTargetMaxFileSize(ConnectorSession session) return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class); } + public static DataSize getIdleWriterMinFileSize(ConnectorSession session) + { + return session.getProperty(IDLE_WRITER_MIN_FILE_SIZE, DataSize.class); + } + public static boolean isParallelPartitionedBucketedWrites(ConnectorSession session) { return session.getProperty(PARALLEL_PARTITIONED_BUCKETED_WRITES, Boolean.class); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCloseIdleWriters.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCloseIdleWriters.java new file mode 100644 index 000000000000..60c65e468e87 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCloseIdleWriters.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; + +import static io.trino.SystemSessionProperties.IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD; +import static io.trino.SystemSessionProperties.SCALE_WRITERS; +import static io.trino.SystemSessionProperties.TASK_MAX_WRITER_COUNT; +import static io.trino.SystemSessionProperties.TASK_MIN_WRITER_COUNT; +import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCloseIdleWriters + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.builder() + .setNodeCount(1) + // Set the target max file size to 100GB so that we don't close writers due to file size in append + // page. + .setHiveProperties(ImmutableMap.of( + "hive.target-max-file-size", "100GB", + "hive.idle-writer-min-file-size", "0.1MB")) + .build(); + } + + @Test + public void testCloseIdleWriters() + { + String tableName = "task_close_idle_writers_" + randomNameSuffix(); + try { + // Create a table with two partitions (0 and 1). Using the order by trick we will write the partitions in + // this order 0, 1, and then again 0. This way we are sure that during partition 1 write there will + // be an idle writer for partition 0. Additionally, during second partition 0 write, there will be an idle + // writer for partition 1. + @Language("SQL") String createTableSql = """ + CREATE TABLE %s WITH (format = 'ORC', partitioned_by = ARRAY['shipmodeVal']) + AS SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, + discount, tax, returnflag, linestatus, commitdate, receiptdate, shipinstruct, + comment, shipdate, + CASE + WHEN shipmode IN ('AIR', 'FOB', 'SHIP', 'TRUCK') THEN 0 + WHEN shipmode IN ('MAIL', 'RAIL', 'REG AIR') THEN 1 + ELSE 2 + END AS shipmodeVal + FROM tpch.tiny.lineitem + ORDER BY shipmode + LIMIT 60174 + """.formatted(tableName); + + // Disable all kind of scaling and set idle writer threshold to 10MB + assertUpdate( + Session.builder(getSession()) + .setSystemProperty(SCALE_WRITERS, "false") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") + .setSystemProperty(TASK_MAX_WRITER_COUNT, "1") + .setSystemProperty(TASK_MIN_WRITER_COUNT, "1") + .setSystemProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD, "0.1MB") + .build(), + createTableSql, + 60174); + long files = (long) computeScalar("SELECT count(DISTINCT \"$path\") FROM " + tableName); + // There should more than 2 files since we triggered close idle writers. + assertThat(files).isGreaterThan(2); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index 7abc47c55b61..63cb594b7bae 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -55,6 +55,7 @@ public void testDefaults() .setMaxSplitsPerSecond(null) .setDomainCompactionThreshold(1000) .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) + .setIdleWriterMinFileSize(DataSize.of(16, MEGABYTE)) .setForceLocalScheduling(false) .setMaxConcurrentFileSystemOperations(20) .setMaxConcurrentMetastoreDrops(20) @@ -138,6 +139,7 @@ public void testExplicitPropertyMappings() .put("hive.max-splits-per-second", "1") .put("hive.domain-compaction-threshold", "42") .put("hive.target-max-file-size", "72MB") + .put("hive.idle-writer-min-file-size", "1MB") .put("hive.recursive-directories", "true") .put("hive.ignore-absent-partitions", "true") .put("hive.storage-format", "SEQUENCEFILE") @@ -218,6 +220,7 @@ public void testExplicitPropertyMappings() .setMaxSplitsPerSecond(1) .setDomainCompactionThreshold(42) .setTargetMaxFileSize(DataSize.of(72, Unit.MEGABYTE)) + .setIdleWriterMinFileSize(DataSize.of(1, MEGABYTE)) .setForceLocalScheduling(true) .setMaxConcurrentFileSystemOperations(100) .setMaxConcurrentMetastoreDrops(100) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index d4e5bce1c78c..11106b4d437f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.json.JsonCodec; import io.airlift.slice.Slices; +import io.airlift.units.DataSize; import io.trino.filesystem.FileEntry; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; @@ -56,16 +57,21 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.function.Function; import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.units.DataSize.Unit.BYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.hive.thrift.metastore.hive_metastoreConstants.FILE_INPUT_FORMAT; +import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveCompressionOption.LZ4; import static io.trino.plugin.hive.HiveCompressionOption.NONE; +import static io.trino.plugin.hive.HiveStorageFormat.PARQUET; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; @@ -85,6 +91,7 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.testing.TestingPageSinkId.TESTING_PAGE_SINK_ID; +import static io.trino.tpch.LineItemColumn.SHIP_MODE; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static java.lang.Math.round; import static java.lang.String.format; @@ -125,7 +132,7 @@ void testAllFormats() if (codec == NONE) { continue; } - if ((format == HiveStorageFormat.PARQUET) && (codec == LZ4)) { + if ((format == PARQUET) && (codec == LZ4)) { // TODO (https://github.com/trinodb/trino/issues/9142) LZ4 is not supported with native Parquet writer continue; } @@ -145,6 +152,72 @@ void testAllFormats() } } + @Test + public void testCloseIdleWritersWhenIdleWriterMinFileSizeLimitIsReached() + throws IOException + { + testCloseIdleWriters(DataSize.of(1, BYTE), 2, 1); + } + + @Test + public void testCloseIdleWritersWhenIdleWriterMinFileSizeLimitIsNotReached() + throws IOException + { + testCloseIdleWriters(DataSize.of(100, MEGABYTE), 1, 1); + } + + private void testCloseIdleWriters(DataSize idleWritersMinFileSize, int expectedTruckFiles, int expectedShipFiles) + throws IOException + { + HiveConfig config = new HiveConfig() + .setIdleWriterMinFileSize(idleWritersMinFileSize) + .setHiveStorageFormat(PARQUET) + .setHiveCompressionCodec(NONE); + SortingFileWriterConfig sortingFileWriterConfig = new SortingFileWriterConfig(); + + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + HiveMetastore metastore = createTestingFileHiveMetastore(fileSystemFactory, Location.of("memory:///metastore")); + + HiveTransactionHandle transaction = new HiveTransactionHandle(false); + HiveWriterStats stats = new HiveWriterStats(); + List columnHandles = getPartitionedColumnHandles(SHIP_MODE.getColumnName()); + Location location = makeFileName(config); + ConnectorPageSink pageSink = createPageSink(fileSystemFactory, transaction, config, sortingFileWriterConfig, metastore, location, stats, columnHandles); + Page truckPage = createPage(lineItem -> lineItem.getShipMode().equals("TRUCK")); + Page shipPage = createPage(lineItem -> lineItem.getShipMode().equals("SHIP")); + + pageSink.appendPage(truckPage); + pageSink.appendPage(shipPage); + // This call will mark the truck and ship partition as idle. + pageSink.closeIdleWriters(); + + // This call will mark the ship partition as non-idle. + pageSink.appendPage(shipPage); + // This call will close the truck partition if idleWritersMinFileSize limit is reached since + // it is still idle. + pageSink.closeIdleWriters(); + + pageSink.appendPage(truckPage); + pageSink.appendPage(shipPage); + + getFutureValue(pageSink.finish()); + FileIterator fileIterator = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).listFiles(location); + + int truckFileCount = 0; + int shipFileCount = 0; + while (fileIterator.hasNext()) { + FileEntry file = fileIterator.next(); + if (file.location().toString().contains("TRUCK")) { + truckFileCount++; + } + else if (file.location().toString().contains("SHIP")) { + shipFileCount++; + } + } + assertThat(truckFileCount).isEqualTo(expectedTruckFiles); + assertThat(shipFileCount).isEqualTo(expectedShipFiles); + } + private static boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionOption compressionOption) { if (storageFormat == HiveStorageFormat.AVRO && compressionOption == LZ4) { @@ -163,17 +236,52 @@ private static long writeTestFile(TrinoFileSystemFactory fileSystemFactory, Hive { HiveTransactionHandle transaction = new HiveTransactionHandle(false); HiveWriterStats stats = new HiveWriterStats(); - ConnectorPageSink pageSink = createPageSink(fileSystemFactory, transaction, config, sortingFileWriterConfig, metastore, location, stats); + ConnectorPageSink pageSink = createPageSink(fileSystemFactory, transaction, config, sortingFileWriterConfig, metastore, location, stats, getColumnHandles()); List columns = getTestColumns(); List columnTypes = columns.stream() .map(LineItemColumn::getType) .map(TestHivePageSink::getType) .map(hiveType -> TESTING_TYPE_MANAGER.getType(hiveType.getTypeSignature())) .collect(toList()); + Page page = createPage(lineItem -> true); + pageSink.appendPage(page); + getFutureValue(pageSink.finish()); + + FileIterator fileIterator = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).listFiles(location); + FileEntry fileEntry = fileIterator.next(); + assertThat(fileIterator.hasNext()).isFalse(); + List pages = new ArrayList<>(); + try (ConnectorPageSource pageSource = createPageSource(fileSystemFactory, transaction, config, fileEntry.location())) { + while (!pageSource.isFinished()) { + Page nextPage = pageSource.getNextPage(); + if (nextPage != null) { + pages.add(nextPage.getLoadedPage()); + } + } + } + + MaterializedResult expectedResults = toMaterializedResult(getHiveSession(config), columnTypes, ImmutableList.of(page)); + MaterializedResult results = toMaterializedResult(getHiveSession(config), columnTypes, pages); + assertThat(results).containsExactlyElementsOf(expectedResults); + assertThat(round(stats.getInputPageSizeInBytes().getAllTime().getMax())).isEqualTo(page.getRetainedSizeInBytes()); + return fileEntry.length(); + } + + private static Page createPage(Function filter) + { + List columns = getTestColumns(); + List columnTypes = columns.stream() + .map(LineItemColumn::getType) + .map(TestHivePageSink::getType) + .map(hiveType -> TESTING_TYPE_MANAGER.getType(hiveType.getTypeSignature())) + .collect(toList()); PageBuilder pageBuilder = new PageBuilder(columnTypes); int rows = 0; for (LineItem lineItem : new LineItemGenerator(0.01, 1, 1)) { + if (!filter.apply(lineItem)) { + continue; + } rows++; if (rows >= NUM_ROWS) { break; @@ -203,29 +311,7 @@ private static long writeTestFile(TrinoFileSystemFactory fileSystemFactory, Hive } } } - Page page = pageBuilder.build(); - pageSink.appendPage(page); - getFutureValue(pageSink.finish()); - - FileIterator fileIterator = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).listFiles(location); - FileEntry fileEntry = fileIterator.next(); - assertThat(fileIterator.hasNext()).isFalse(); - - List pages = new ArrayList<>(); - try (ConnectorPageSource pageSource = createPageSource(fileSystemFactory, transaction, config, fileEntry.location())) { - while (!pageSource.isFinished()) { - Page nextPage = pageSource.getNextPage(); - if (nextPage != null) { - pages.add(nextPage.getLoadedPage()); - } - } - } - - MaterializedResult expectedResults = toMaterializedResult(getHiveSession(config), columnTypes, ImmutableList.of(page)); - MaterializedResult results = toMaterializedResult(getHiveSession(config), columnTypes, pages); - assertThat(results).containsExactlyElementsOf(expectedResults); - assertThat(round(stats.getInputPageSizeInBytes().getAllTime().getMax())).isEqualTo(page.getRetainedSizeInBytes()); - return fileEntry.length(); + return pageBuilder.build(); } static MaterializedResult toMaterializedResult(ConnectorSession session, List types, List pages) @@ -274,13 +360,21 @@ private static ConnectorPageSource createPageSource(TrinoFileSystemFactory fileS return provider.createPageSource(transaction, getHiveSession(config), split, table, ImmutableList.copyOf(getColumnHandles()), DynamicFilter.EMPTY); } - private static ConnectorPageSink createPageSink(TrinoFileSystemFactory fileSystemFactory, HiveTransactionHandle transaction, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Location location, HiveWriterStats stats) + private static ConnectorPageSink createPageSink( + TrinoFileSystemFactory fileSystemFactory, + HiveTransactionHandle transaction, + HiveConfig config, + SortingFileWriterConfig sortingFileWriterConfig, + HiveMetastore metastore, + Location location, + HiveWriterStats stats, + List columnHandles) { LocationHandle locationHandle = new LocationHandle(location, location, DIRECT_TO_TARGET_NEW_DIRECTORY); HiveOutputTableHandle handle = new HiveOutputTableHandle( SCHEMA_NAME, TABLE_NAME, - getColumnHandles(), + columnHandles, new HivePageSinkMetadata(new SchemaTableName(SCHEMA_NAME, TABLE_NAME), metastore.getTable(SCHEMA_NAME, TABLE_NAME), ImmutableMap.of()), locationHandle, config.getHiveStorageFormat(), @@ -323,6 +417,23 @@ private static List getColumnHandles() return handles.build(); } + private static List getPartitionedColumnHandles(String partitionColumn) + { + ImmutableList.Builder handles = ImmutableList.builder(); + List columns = getTestColumns(); + for (int i = 0; i < columns.size(); i++) { + LineItemColumn column = columns.get(i); + Type type = getType(column.getType()); + if (column.getColumnName().equals(partitionColumn)) { + handles.add(createBaseColumn(column.getColumnName(), i, HiveType.toHiveType(type), type, PARTITION_KEY, Optional.empty())); + } + else { + handles.add(createBaseColumn(column.getColumnName(), i, HiveType.toHiveType(type), type, REGULAR, Optional.empty())); + } + } + return handles.build(); + } + private static List getTestColumns() { return Stream.of(LineItemColumn.values()) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 5b3e0d7f8df5..67e4b9675c2a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -30,6 +30,7 @@ import java.util.Optional; import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; @@ -67,6 +68,7 @@ public class IcebergConfig private Duration expireSnapshotsMinRetention = new Duration(7, DAYS); private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS); private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); + private DataSize idleWriterMinFileSize = DataSize.of(16, MEGABYTE); // This is meant to protect users who are misusing schema locations (by // putting schemas in locations with extraneous files), so default to false // to avoid deleting those files if Trino is unable to check. @@ -315,6 +317,20 @@ public IcebergConfig setTargetMaxFileSize(DataSize targetMaxFileSize) return this; } + @NotNull + public DataSize getIdleWriterMinFileSize() + { + return idleWriterMinFileSize; + } + + @Config("iceberg.idle-writer-min-file-size") + @ConfigDescription("Minimum data written by a single partition writer before it can be consider as 'idle' and could be closed by the engine") + public IcebergConfig setIdleWriterMinFileSize(DataSize idleWriterMinFileSize) + { + this.idleWriterMinFileSize = idleWriterMinFileSize; + return this; + } + public boolean isDeleteSchemaLocationsFallback() { return this.deleteSchemaLocationsFallback; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index b54d8f25fa9b..284385d0e49b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.filesystem.Location; @@ -91,6 +92,8 @@ public class IcebergPageSink implements ConnectorPageSink { + private static final Logger LOG = Logger.get(IcebergPageSink.class); + private static final int MAX_PAGE_POSITIONS = 4096; private final int maxOpenWriters; @@ -105,6 +108,7 @@ public class IcebergPageSink private final MetricsConfig metricsConfig; private final PagePartitioner pagePartitioner; private final long targetMaxFileSize; + private final long idleWriterMinFileSize; private final Map storageProperties; private final List sortOrder; private final boolean sortedWritingEnabled; @@ -120,6 +124,7 @@ public class IcebergPageSink private final List writers = new ArrayList<>(); private final List closedWriterRollbackActions = new ArrayList<>(); private final Collection commitTasks = new ArrayList<>(); + private final List activeWriters = new ArrayList<>(); private long writtenBytes; private long memoryUsage; @@ -157,6 +162,7 @@ public IcebergPageSink( this.maxOpenWriters = maxOpenWriters; this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); this.targetMaxFileSize = IcebergSessionProperties.getTargetMaxFileSize(session); + this.idleWriterMinFileSize = IcebergSessionProperties.getIdleWriterMinFileSize(session); this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); this.sortOrder = requireNonNull(sortOrder, "sortOrder is null"); this.sortedWritingEnabled = isSortedWritingEnabled(session); @@ -300,7 +306,9 @@ private void writePage(Page page) pageForWriter = pageForWriter.getPositions(positions, 0, positions.length); } - IcebergFileWriter writer = writers.get(index).getWriter(); + WriteContext writeContext = writers.get(index); + verify(writeContext != null, "Expected writer at index %s", index); + IcebergFileWriter writer = writeContext.getWriter(); long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); @@ -309,6 +317,8 @@ private void writePage(Page page) writtenBytes += (writer.getWrittenBytes() - currentWritten); memoryUsage += (writer.getMemoryUsage() - currentMemory); + // Mark this writer as active (i.e. not idle) + activeWriters.set(index, true); } } @@ -323,6 +333,7 @@ private int[] getWriterIndexes(Page page) // expand writers list to new size while (writers.size() <= pagePartitioner.getMaxIndex()) { writers.add(null); + activeWriters.add(false); } // create missing writers @@ -369,14 +380,30 @@ private int[] getWriterIndexes(Page page) memoryUsage += writer.getWriter().getMemoryUsage(); } verify(writers.size() == pagePartitioner.getMaxIndex() + 1); - verify(!writers.contains(null)); return writerIndexes; } + @Override + public void closeIdleWriters() + { + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + WriteContext writeContext = writers.get(writerIndex); + if (activeWriters.get(writerIndex) || writeContext == null || writeContext.getWriter().getWrittenBytes() <= idleWriterMinFileSize) { + activeWriters.set(writerIndex, false); + continue; + } + LOG.debug("Closing writer %s with %s bytes written", writerIndex, writeContext.getWriter().getWrittenBytes()); + closeWriter(writerIndex); + } + } + private void closeWriter(int writerIndex) { WriteContext writeContext = writers.get(writerIndex); + if (writeContext == null) { + return; + } IcebergFileWriter writer = writeContext.getWriter(); long currentWritten = writer.getWrittenBytes(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 42868452dc52..613eea702275 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -84,6 +84,7 @@ public final class IcebergSessionProperties public static final String EXTENDED_STATISTICS_ENABLED = "extended_statistics_enabled"; private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; + private static final String IDLE_WRITER_MIN_FILE_SIZE = "idle_writer_min_file_size"; public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE = "collect_extended_statistics_on_write"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; @@ -279,6 +280,11 @@ public IcebergSessionProperties( "Target maximum size of written files; the actual size may be larger", icebergConfig.getTargetMaxFileSize(), false)) + .add(dataSizeProperty( + IDLE_WRITER_MIN_FILE_SIZE, + "Minimum data written by a single partition writer before it can be consider as 'idle' and could be closed by the engine", + icebergConfig.getIdleWriterMinFileSize(), + false)) .add(booleanProperty( COLLECT_EXTENDED_STATISTICS_ON_WRITE, COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION, @@ -492,6 +498,11 @@ public static long getTargetMaxFileSize(ConnectorSession session) return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes(); } + public static long getIdleWriterMinFileSize(ConnectorSession session) + { + return session.getProperty(IDLE_WRITER_MIN_FILE_SIZE, DataSize.class).toBytes(); + } + public static Optional getHiveCatalogName(ConnectorSession session) { return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class)); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestCloseIdleWriters.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestCloseIdleWriters.java new file mode 100644 index 000000000000..869555a55509 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestCloseIdleWriters.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; + +import static io.trino.SystemSessionProperties.IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD; +import static io.trino.SystemSessionProperties.SCALE_WRITERS; +import static io.trino.SystemSessionProperties.TASK_MAX_WRITER_COUNT; +import static io.trino.SystemSessionProperties.TASK_MIN_WRITER_COUNT; +import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCloseIdleWriters + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setNodeCount(1) + // Set the target max file size to 100GB so that we don't close writers due to file size in append + // page. + .setIcebergProperties(ImmutableMap.of( + "iceberg.target-max-file-size", "100GB", + "iceberg.idle-writer-min-file-size", "0.1MB")) + .build(); + } + + @Test + public void testCloseIdleWriters() + { + String tableName = "task_close_idle_writers_" + randomNameSuffix(); + try { + // Create a table with two partitions (0 and 1). Using the order by trick we will write the partitions in + // this order 0, 1, and then again 0. This way we are sure that during partition 1 write there will + // be an idle writer for partition 0. Additionally, during second partition 0 write, there will be an idle + // writer for partition 1. + @Language("SQL") String createTableSql = """ + CREATE TABLE %s WITH (format = 'ORC', partitioning = ARRAY['shipmodeVal']) + AS SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, + discount, tax, returnflag, linestatus, commitdate, receiptdate, shipinstruct, + comment, shipdate, + CASE + WHEN shipmode IN ('AIR', 'FOB', 'SHIP', 'TRUCK') THEN 0 + WHEN shipmode IN ('MAIL', 'RAIL', 'REG AIR') THEN 1 + ELSE 2 + END AS shipmodeVal + FROM tpch.tiny.lineitem + ORDER BY shipmode + LIMIT 60174 + """.formatted(tableName); + + // Disable all kind of scaling and set idle writer threshold to 5MB + assertUpdate( + Session.builder(getSession()) + .setSystemProperty(SCALE_WRITERS, "false") + .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") + .setSystemProperty(TASK_MAX_WRITER_COUNT, "1") + .setSystemProperty(TASK_MIN_WRITER_COUNT, "1") + .setSystemProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD, "0.1MB") + .build(), + createTableSql, + 60174); + long files = (long) computeScalar("SELECT count(DISTINCT \"$path\") FROM " + tableName); + // There should more than 2 files since we triggered close idle writers. + assertThat(files).isGreaterThan(2); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 89a5d16e14f4..4ca2417beba4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -60,6 +60,7 @@ public void testDefaults() .setRemoveOrphanFilesMinRetention(new Duration(7, DAYS)) .setDeleteSchemaLocationsFallback(false) .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) + .setIdleWriterMinFileSize(DataSize.of(16, MEGABYTE)) .setMinimumAssignedSplitWeight(0.05) .setHideMaterializedViewStorageTable(true) .setMaterializedViewsStorageSchema(null) @@ -89,6 +90,7 @@ public void testExplicitPropertyMappings() .put("iceberg.remove_orphan_files.min-retention", "14h") .put("iceberg.delete-schema-locations-fallback", "true") .put("iceberg.target-max-file-size", "1MB") + .put("iceberg.idle-writer-min-file-size", "1MB") .put("iceberg.minimum-assigned-split-weight", "0.01") .put("iceberg.materialized-views.hide-storage-table", "false") .put("iceberg.materialized-views.storage-schema", "mv_storage_schema") @@ -115,6 +117,7 @@ public void testExplicitPropertyMappings() .setRemoveOrphanFilesMinRetention(new Duration(14, HOURS)) .setDeleteSchemaLocationsFallback(true) .setTargetMaxFileSize(DataSize.of(1, MEGABYTE)) + .setIdleWriterMinFileSize(DataSize.of(1, MEGABYTE)) .setMinimumAssignedSplitWeight(0.01) .setHideMaterializedViewStorageTable(false) .setMaterializedViewsStorageSchema("mv_storage_schema")