diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java index 6e3ce9ae54..c9984cf1cc 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java @@ -24,7 +24,7 @@ public enum AsyncTaskType { ENTITY_CLEANUP_SCHEDULER(1), MANIFEST_FILE_CLEANUP(2), - METADATA_FILE_BATCH_CLEANUP(3); + BATCH_FILE_CLEANUP(3); private final int typeCode; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java new file mode 100644 index 0000000000..8ecfc1b67b --- /dev/null +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.task; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatPredicate; + +import io.quarkus.test.junit.QuarkusTest; +import jakarta.inject.Inject; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.AsyncTaskType; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisTaskConstants; +import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.task.BatchFileCleanupTaskHandler; +import org.apache.polaris.service.task.TaskFileIOSupplier; +import org.apache.polaris.service.task.TaskUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +@QuarkusTest +public class BatchFileCleanupTaskHandlerTest { + @Inject MetaStoreManagerFactory metaStoreManagerFactory; + private final RealmContext realmContext = () -> "realmName"; + + private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { + return new TaskFileIOSupplier( + new FileIOFactory() { + @Override + public FileIO loadFileIO( + @NotNull CallContext callContext, + @NotNull String ioImplClassName, + @NotNull Map properties, + @NotNull TableIdentifier identifier, + @NotNull Set tableLocations, + @NotNull Set storageActions, + @NotNull PolarisResolvedPathWrapper resolvedEntityPath) { + return fileIO; + } + }); + } + + private void addTaskLocation(TaskEntity task) { + Map internalPropertiesAsMap = new HashMap<>(task.getInternalPropertiesAsMap()); + internalPropertiesAsMap.put(PolarisTaskConstants.STORAGE_LOCATION, "file:///tmp/"); + ((PolarisBaseEntity) task).setInternalPropertiesAsMap(internalPropertiesAsMap); + } + + @Test + public void testMetadataFileCleanup() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + + long snapshotId1 = 100L; + ManifestFile manifestFile1 = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); + ManifestFile manifestFile2 = + TaskTestUtils.manifestFile( + fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); + Snapshot snapshot = + TaskTestUtils.newSnapshot( + fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + String firstMetadataFile = "v1-295495059.metadata.json"; + TableMetadata firstMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + + ManifestFile manifestFile3 = + TaskTestUtils.manifestFile( + fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); + Snapshot snapshot2 = + TaskTestUtils.newSnapshot( + fileIO, + "manifestList2.avro", + snapshot.sequenceNumber() + 1, + snapshot.snapshotId() + 1, + snapshot.snapshotId(), + manifestFile1, + manifestFile3); // exclude manifest2 from the new snapshot + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + String secondMetadataFile = "v1-295495060.metadata.json"; + TableMetadata secondMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + secondMetadataFile, + firstMetadata, + firstMetadataFile, + List.of(statisticsFile2), + snapshot2); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); + + List cleanupFiles = + Stream.concat( + secondMetadata.previousFiles().stream() + .map(TableMetadata.MetadataLogEntry::file) + .filter(file -> TaskUtils.exists(file, fileIO)), + secondMetadata.statisticsFiles().stream() + .map(StatisticsFile::path) + .filter(file -> TaskUtils.exists(file, fileIO))) + .toList(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, cleanupFiles)) + .setName(UUID.randomUUID().toString()) + .build(); + + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) + .rejects(firstMetadataFile); + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) + .rejects(statisticsFile1.path()); + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) + .rejects(statisticsFile2.path()); + } + } + + @Test + public void testMetadataFileCleanupIfFileNotExist() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + + fileIO.deleteFile(statisticsFile.path()); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); + + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + } + } + + @Test + public void testCleanupWithRetries() throws IOException { + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl()); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + + @Override + public void deleteFile(String location) { + int attempts = + retryCounter + .computeIfAbsent(location, k -> new AtomicInteger(0)) + .incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("Simulating failure to test retries"); + } else { + super.deleteFile(location); + } + } + }; + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); + + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); + + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + CallContext.setCurrentContext(callCtx); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + handler.handleTask(task, callCtx); // this will schedule the batch deletion + }); + + // Wait for all async tasks to finish + future.join(); + + // Check if the file was successfully deleted after retries + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + + // Ensure that retries happened as expected + assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); + assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); + } + } +} diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java index b6c03ef8ca..edc0090974 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java @@ -26,21 +26,14 @@ import jakarta.inject.Inject; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; import org.apache.commons.codec.binary.Base64; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StatisticsFile; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -273,237 +266,4 @@ public void deleteFile(String location) { assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } } - - @Test - public void testMetadataFileCleanup() throws IOException { - PolarisCallContext polarisCallContext = - new PolarisCallContext( - metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), - new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - - long snapshotId1 = 100L; - ManifestFile manifestFile1 = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); - ManifestFile manifestFile2 = - TaskTestUtils.manifestFile( - fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); - Snapshot snapshot = - TaskTestUtils.newSnapshot( - fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); - StatisticsFile statisticsFile1 = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - String firstMetadataFile = "v1-295495059.metadata.json"; - TableMetadata firstMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - - ManifestFile manifestFile3 = - TaskTestUtils.manifestFile( - fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); - Snapshot snapshot2 = - TaskTestUtils.newSnapshot( - fileIO, - "manifestList2.avro", - snapshot.sequenceNumber() + 1, - snapshot.snapshotId() + 1, - snapshot.snapshotId(), - manifestFile1, - manifestFile3); // exclude manifest2 from the new snapshot - StatisticsFile statisticsFile2 = - TaskTestUtils.writeStatsFile( - snapshot2.snapshotId(), - snapshot2.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - String secondMetadataFile = "v1-295495060.metadata.json"; - TableMetadata secondMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - secondMetadataFile, - firstMetadata, - firstMetadataFile, - List.of(statisticsFile2), - snapshot2); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); - - List cleanupFiles = - Stream.concat( - secondMetadata.previousFiles().stream() - .map(TableMetadata.MetadataLogEntry::file) - .filter(file -> TaskUtils.exists(file, fileIO)), - secondMetadata.statisticsFiles().stream() - .map(StatisticsFile::path) - .filter(file -> TaskUtils.exists(file, fileIO))) - .toList(); - - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, cleanupFiles)) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(firstMetadataFile); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(statisticsFile1.path()); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(statisticsFile2.path()); - } - } - - @Test - public void testMetadataFileCleanupIfFileNotExist() throws IOException { - PolarisCallContext polarisCallContext = - new PolarisCallContext( - metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), - new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - - fileIO.deleteFile(statisticsFile.path()); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } - } - - @Test - public void testCleanupWithRetries() throws IOException { - PolarisCallContext polarisCallContext = - new PolarisCallContext( - metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), - new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("Simulating failure to test retries"); - } else { - super.deleteFile(location); - } - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); - - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - - try (ExecutorService executor = Executors.newSingleThreadExecutor()) { - CompletableFuture future; - future = - CompletableFuture.runAsync( - () -> { - assertThatPredicate(handler::canHandleTask).accepts(task); - handler.handleTask(task, callCtx); // this will schedule the batch deletion - }, - executor); - // Wait for all async tasks to finish - future.join(); - } - - // Check if the file was successfully deleted after retries - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - - // Ensure that retries happened as expected - assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); - assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); - } - } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 06901b0f13..0a5f1e64c8 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -53,6 +53,7 @@ import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.task.BatchFileCleanupTaskHandler; import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler; import org.apache.polaris.service.task.TableCleanupTaskHandler; import org.apache.polaris.service.task.TaskFileIOSupplier; @@ -148,9 +149,7 @@ public void testTableCleanup() throws IOException { assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) - .returns( - AsyncTaskType.MANIFEST_FILE_CLEANUP, - taskEntity1 -> taskEntity1.getTaskType()) + .returns(AsyncTaskType.MANIFEST_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, @@ -163,15 +162,14 @@ public void testTableCleanup() throws IOException { .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) .returns( - AsyncTaskType.METADATA_FILE_BATCH_CLEANUP, - taskEntity2 -> taskEntity2.getTaskType()) + AsyncTaskType.BATCH_FILE_CLEANUP, taskEntity2 -> taskEntity2.getTaskType()) .returns( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, List.of(snapshot.manifestListLocation(), statisticsFile.path())), entity -> entity.readData( - ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); + BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); } @Test @@ -292,35 +290,29 @@ public void close() { assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) + .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( - AsyncTaskType.METADATA_FILE_BATCH_CLEANUP, - taskEntity1 -> taskEntity1.getTaskType()) - .returns( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, List.of(snapshot.manifestListLocation())), entity -> entity.readData( - ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) + .returns(AsyncTaskType.BATCH_FILE_CLEANUP, TaskEntity::getTaskType) .returns( - AsyncTaskType.METADATA_FILE_BATCH_CLEANUP, - taskEntity2 -> taskEntity2.getTaskType()) - .returns( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, List.of(snapshot.manifestListLocation())), entity -> entity.readData( - ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)), + BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)), taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) - .returns( - AsyncTaskType.MANIFEST_FILE_CLEANUP, - taskEntity3 -> taskEntity3.getTaskType()) + .returns(AsyncTaskType.MANIFEST_FILE_CLEANUP, TaskEntity::getTaskType) .returns( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableIdentifier, @@ -426,7 +418,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { .filter( entity -> { AsyncTaskType taskType = TaskEntity.of(entity).getTaskType(); - return taskType == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP; + return taskType == AsyncTaskType.BATCH_FILE_CLEANUP; }) .toList(); @@ -438,7 +430,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) .returns( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, List.of( snapshot.manifestListLocation(), @@ -447,7 +439,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { statisticsFile2.path())), entity -> entity.readData( - ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); + BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); assertThat(manifestCleanupTasks) // all three manifests should be present, even though one is excluded from the latest @@ -585,7 +577,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { .filter( entity -> { AsyncTaskType taskType = TaskEntity.of(entity).getTaskType(); - return taskType == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP; + return taskType == AsyncTaskType.BATCH_FILE_CLEANUP; }) .toList(); @@ -597,7 +589,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) .extracting(TaskEntity::of) .returns( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableIdentifier, List.of( firstMetadataFile, @@ -607,7 +599,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { statisticsFile2.path())), entity -> entity.readData( - ManifestFileCleanupTaskHandler.ManifestCleanupTask.class))); + BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); assertThat(manifestCleanupTasks) .hasSize(3) diff --git a/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java new file mode 100644 index 0000000000..d47725351f --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.AsyncTaskType; +import org.apache.polaris.core.entity.TaskEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link BatchFileCleanupTaskHandler} responsible for batch file cleanup by processing multiple + * file deletions in a single task handler. Valid files are deleted asynchronously with retries for + * transient errors, while missing files are logged and skipped. + */ +public class BatchFileCleanupTaskHandler extends FileCleanupTaskHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(BatchFileCleanupTaskHandler.class); + + public BatchFileCleanupTaskHandler( + TaskFileIOSupplier fileIOSupplier, ExecutorService executorService) { + super(fileIOSupplier, executorService); + } + + @Override + public boolean canHandleTask(TaskEntity task) { + return task.getTaskType() == AsyncTaskType.BATCH_FILE_CLEANUP; + } + + @Override + public boolean handleTask(TaskEntity task, CallContext callContext) { + BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class); + TableIdentifier tableId = cleanupTask.tableId(); + List batchFiles = cleanupTask.batchFiles(); + try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) { + List validFiles = + batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList(); + if (validFiles.isEmpty()) { + LOGGER + .atWarn() + .addKeyValue("batchFiles", batchFiles.toString()) + .addKeyValue("tableId", tableId) + .log("File batch cleanup task scheduled, but none of the files in batch exists"); + return true; + } + if (validFiles.size() < batchFiles.size()) { + List missingFiles = + batchFiles.stream().filter(file -> !TaskUtils.exists(file, authorizedFileIO)).toList(); + LOGGER + .atWarn() + .addKeyValue("batchFiles", batchFiles.toString()) + .addKeyValue("missingFiles", missingFiles.toString()) + .addKeyValue("tableId", tableId) + .log( + "File batch cleanup task scheduled, but {} files in the batch are missing", + missingFiles.size()); + } + + // Schedule the deletion for each file asynchronously + List> deleteFutures = + validFiles.stream() + .map(file -> super.tryDelete(tableId, authorizedFileIO, null, file, null, 1)) + .toList(); + + try { + // Wait for all delete operations to finish + CompletableFuture allDeletes = + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); + allDeletes.join(); + } catch (Exception e) { + LOGGER.error("Exception detected during batch files deletion", e); + return false; + } + + return true; + } + } + + public record BatchFileCleanupTask(TableIdentifier tableId, List batchFiles) {} +} diff --git a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java new file mode 100644 index 0000000000..aa0b0d9f1e --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.TaskEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link FileCleanupTaskHandler} responsible for cleaning up files in table tasks. Handles retries + * for file deletions and skips files that are already missing. Subclasses must implement + * task-specific logic. + */ +public abstract class FileCleanupTaskHandler implements TaskHandler { + + public static final int MAX_ATTEMPTS = 3; + public static final int FILE_DELETION_RETRY_MILLIS = 100; + public final TaskFileIOSupplier fileIOSupplier; + public final ExecutorService executorService; + private static final Logger LOGGER = LoggerFactory.getLogger(FileCleanupTaskHandler.class); + + public FileCleanupTaskHandler( + TaskFileIOSupplier fileIOSupplier, ExecutorService executorService) { + this.fileIOSupplier = fileIOSupplier; + this.executorService = executorService; + } + + @Override + public abstract boolean canHandleTask(TaskEntity task); + + @Override + public abstract boolean handleTask(TaskEntity task, CallContext callContext); + + public CompletableFuture tryDelete( + TableIdentifier tableId, + FileIO fileIO, + String baseFile, + String file, + Throwable e, + int attempt) { + if (e != null && attempt <= MAX_ATTEMPTS) { + LOGGER + .atWarn() + .addKeyValue("file", file) + .addKeyValue("attempt", attempt) + .addKeyValue("error", e.getMessage()) + .log("Error encountered attempting to delete file"); + } + if (attempt > MAX_ATTEMPTS && e != null) { + return CompletableFuture.failedFuture(e); + } + return CompletableFuture.runAsync( + () -> { + // totally normal for a file to already be missing, e.g. a data file + // may be in multiple manifests. There's a possibility we check the + // file's existence, but then it is deleted before we have a chance to + // send the delete request. In such a case, we should retry + // and find + if (TaskUtils.exists(file, fileIO)) { + fileIO.deleteFile(file); + } else { + LOGGER + .atInfo() + .addKeyValue("file", file) + .addKeyValue("baseFile", baseFile != null ? baseFile : "") + .addKeyValue("tableId", tableId) + .log("table file cleanup task scheduled, but data file doesn't exist"); + } + }, + executorService) + .exceptionallyComposeAsync( + newEx -> { + LOGGER + .atWarn() + .addKeyValue("file", file) + .addKeyValue("tableIdentifier", tableId) + .addKeyValue("baseFile", baseFile != null ? baseFile : "") + .log("Exception caught deleting data file", newEx); + return tryDelete(tableId, fileIO, baseFile, file, newEx, attempt + 1); + }, + CompletableFuture.delayedExecutor( + FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java index 9179e0f336..84e9683a27 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java @@ -20,13 +20,11 @@ import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; import org.apache.commons.codec.binary.Base64; import org.apache.iceberg.DataFile; @@ -42,50 +40,32 @@ import org.slf4j.LoggerFactory; /** - * {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the - * files in a manifest and the manifest itself. Since data files may be present in multiple - * manifests across different snapshots, we assume a data file that doesn't exist is missing because - * it was already deleted by another task. 2. Table metadata files: It contains previous metadata - * and statistics files, which are grouped and deleted in batch + * {@link ManifestFileCleanupTaskHandler} responsible for deleting all the files in a manifest and + * the manifest itself. Since data files may be present in multiple manifests across different + * snapshots, we assume a data file that doesn't exist is missing because it was already deleted by + * another task. */ -// TODO: Rename this class since we introducing metadata cleanup here -public class ManifestFileCleanupTaskHandler implements TaskHandler { - public static final int MAX_ATTEMPTS = 3; - public static final int FILE_DELETION_RETRY_MILLIS = 100; +public class ManifestFileCleanupTaskHandler extends FileCleanupTaskHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ManifestFileCleanupTaskHandler.class); - private final TaskFileIOSupplier fileIOSupplier; - private final ExecutorService executorService; public ManifestFileCleanupTaskHandler( TaskFileIOSupplier fileIOSupplier, ExecutorService executorService) { - this.fileIOSupplier = fileIOSupplier; - this.executorService = executorService; + super(fileIOSupplier, executorService); } @Override public boolean canHandleTask(TaskEntity task) { - return task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP - || task.getTaskType() == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP; + return task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP; } @Override public boolean handleTask(TaskEntity task, CallContext callContext) { ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class); - TableIdentifier tableId = cleanupTask.getTableId(); + TableIdentifier tableId = cleanupTask.tableId(); try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) { - if (task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP) { - ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); - return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); - } else if (task.getTaskType() == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) { - return cleanUpMetadataFiles(cleanupTask.getMetadataFiles(), authorizedFileIO, tableId); - } else { - LOGGER - .atWarn() - .addKeyValue("tableId", tableId) - .log("Unknown task type {}", task.getTaskType()); - return false; - } + ManifestFile manifestFile = decodeManifestData(cleanupTask.manifestFileData()); + return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); } } @@ -107,7 +87,10 @@ private boolean cleanUpManifestFile( StreamSupport.stream( Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), false) - .map(file -> tryDelete(tableId, fileIO, manifestFile, file.location(), null, 1)) + .map( + file -> + tryDelete( + tableId, fileIO, manifestFile.path(), file.path().toString(), null, 1)) .toList(); LOGGER.debug( "Scheduled {} data files to be deleted from manifest {}", @@ -122,7 +105,8 @@ private boolean cleanUpManifestFile( .atInfo() .addKeyValue("manifestFile", manifestFile.path()) .log("All data files in manifest deleted - deleting manifest"); - return tryDelete(tableId, fileIO, manifestFile, manifestFile.path(), null, 1); + return tryDelete( + tableId, fileIO, manifestFile.path(), manifestFile.path(), null, 1); }) .get(); return true; @@ -136,48 +120,6 @@ private boolean cleanUpManifestFile( } } - private boolean cleanUpMetadataFiles( - List metadataFiles, FileIO fileIO, TableIdentifier tableId) { - List validFiles = - metadataFiles.stream().filter(file -> TaskUtils.exists(file, fileIO)).toList(); - if (validFiles.isEmpty()) { - LOGGER - .atWarn() - .addKeyValue("metadataFiles", metadataFiles.toString()) - .addKeyValue("tableId", tableId) - .log("Table metadata cleanup task scheduled, but the none of the file in batch exists"); - return true; - } - if (validFiles.size() < metadataFiles.size()) { - List missingFiles = - metadataFiles.stream().filter(file -> !TaskUtils.exists(file, fileIO)).toList(); - LOGGER - .atWarn() - .addKeyValue("metadataFiles", metadataFiles.toString()) - .addKeyValue("missingFiles", missingFiles) - .addKeyValue("tableId", tableId) - .log( - "Table metadata cleanup task scheduled, but {} files in the batch are missing", - missingFiles.size()); - } - - // Schedule the deletion for each file asynchronously - List> deleteFutures = - validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file, null, 1)).toList(); - - try { - // Wait for all delete operations to finish - CompletableFuture allDeletes = - CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); - allDeletes.join(); - } catch (Exception e) { - LOGGER.error("Exception detected during metadata file deletion", e); - return false; - } - - return true; - } - private static ManifestFile decodeManifestData(String manifestFileData) { try { return ManifestFiles.decode(Base64.decodeBase64(manifestFileData)); @@ -186,111 +128,7 @@ private static ManifestFile decodeManifestData(String manifestFileData) { } } - private CompletableFuture tryDelete( - TableIdentifier tableId, - FileIO fileIO, - ManifestFile manifestFile, - String file, - Throwable e, - int attempt) { - if (e != null && attempt <= MAX_ATTEMPTS) { - LOGGER - .atWarn() - .addKeyValue("file", file) - .addKeyValue("attempt", attempt) - .addKeyValue("error", e.getMessage()) - .log("Error encountered attempting to delete file"); - } - if (attempt > MAX_ATTEMPTS && e != null) { - return CompletableFuture.failedFuture(e); - } - return CompletableFuture.runAsync( - () -> { - // totally normal for a file to already be missing, as a data file - // may be in multiple manifests. There's a possibility we check the - // file's existence, but then it is deleted before we have a chance to - // send the delete request. In such a case, we should retry - // and find - if (TaskUtils.exists(file, fileIO)) { - fileIO.deleteFile(file); - } else { - LOGGER - .atInfo() - .addKeyValue("file", file) - .addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "") - .addKeyValue("tableId", tableId) - .log("table file cleanup task scheduled, but data file doesn't exist"); - } - }, - executorService) - .exceptionallyComposeAsync( - newEx -> { - LOGGER - .atWarn() - .addKeyValue("dataFile", file) - .addKeyValue("tableIdentifier", tableId) - .addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "") - .log("Exception caught deleting data file from manifest", newEx); - return tryDelete(tableId, fileIO, manifestFile, file, newEx, attempt + 1); - }, - CompletableFuture.delayedExecutor( - FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); - } - /** Serialized Task data sent from the {@link TableCleanupTaskHandler} */ - public static final class ManifestCleanupTask { - private TableIdentifier tableId; - private String manifestFileData; - private List metadataFiles; - - public ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) { - this.tableId = tableId; - this.manifestFileData = manifestFileData; - } - - public ManifestCleanupTask(TableIdentifier tableId, List metadataFiles) { - this.tableId = tableId; - this.metadataFiles = metadataFiles; - } - - public ManifestCleanupTask() {} - - public TableIdentifier getTableId() { - return tableId; - } - - public void setTableId(TableIdentifier tableId) { - this.tableId = tableId; - } - - public String getManifestFileData() { - return manifestFileData; - } - - public void setManifestFileData(String manifestFileData) { - this.manifestFileData = manifestFileData; - } - - public List getMetadataFiles() { - return metadataFiles; - } - - public void setMetadataFiles(List metadataFiles) { - this.metadataFiles = metadataFiles; - } - - @Override - public boolean equals(Object object) { - if (this == object) return true; - if (!(object instanceof ManifestCleanupTask that)) return false; - return Objects.equals(tableId, that.tableId) - && Objects.equals(manifestFileData, that.manifestFileData) - && Objects.equals(metadataFiles, that.metadataFiles); - } - - @Override - public int hashCode() { - return Objects.hash(tableId, manifestFileData, metadataFiles); - } - } + public record ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) {} + ; } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 3a9cab8f15..b656ca5300 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -231,9 +231,9 @@ private Stream getMetadataTaskStream( .setName(taskName) .setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId()) .setCreateTimestamp(polarisCallContext.getClock().millis()) - .withTaskType(AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableEntity.getTableIdentifier(), metadataBatch)) .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) .build(); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index a404718cfd..cf5f2d9770 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -64,6 +64,9 @@ public void init() { addTaskHandler( new ManifestFileCleanupTaskHandler( fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); + addTaskHandler( + new BatchFileCleanupTaskHandler( + fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); } /**