From 94eb3e9f367fd534eebfa2f61e884565ceae7d8f Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Thu, 30 Mar 2023 12:01:57 +0800 Subject: [PATCH 1/4] close threads in some actions --- .../iceberg/spark/actions/DeleteOrphanFilesSparkAction.java | 4 ++++ .../spark/actions/DeleteReachableFilesSparkAction.java | 4 ++++ .../iceberg/spark/actions/ExpireSnapshotsSparkAction.java | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index b00ed42008f1..246f75dcd1f3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -270,6 +270,10 @@ private DeleteOrphanFiles.Result doExecute() { LOG.info("Custom delete function provided. Using non-bulk deletes"); deleteTasks.run(deleteFunc::accept); } + + if (deleteExecutorService != null) { + deleteExecutorService.shutdown(); + } } return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index ea6ac9f3dbf5..7c72d9a67550 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -142,6 +142,10 @@ private DeleteReachableFiles.Result deleteFiles(Iterator files) { } } + if (deleteExecutorService != null) { + deleteExecutorService.shutdown(); + } + LOG.info("Deleted {} total files", summary.totalFilesCount()); return ImmutableDeleteReachableFiles.Result.builder() diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index 2468497e42d0..711a0415cecd 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -256,6 +256,10 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { } } + if (deleteExecutorService != null) { + deleteExecutorService.shutdown(); + } + LOG.info("Deleted {} total files", summary.totalFilesCount()); return ImmutableExpireSnapshots.Result.builder() From 4036c48175c7785a908d7f15133bdec130ebb023 Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Fri, 31 Mar 2023 09:33:25 +0800 Subject: [PATCH 2/4] close threads pool in procedure and file io --- .../main/java/org/apache/iceberg/aws/s3/S3FileIO.java | 4 +++- .../java/org/apache/iceberg/hadoop/HadoopFileIO.java | 4 +++- .../spark/actions/DeleteOrphanFilesSparkAction.java | 4 ---- .../actions/DeleteReachableFilesSparkAction.java | 5 ----- .../spark/actions/ExpireSnapshotsSparkAction.java | 4 ---- .../spark/procedures/ExpireSnapshotsProcedure.java | 11 +++++++++-- .../spark/procedures/RemoveOrphanFilesProcedure.java | 11 +++++++++-- 7 files changed, 24 insertions(+), 19 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 7aeb75b01417..2aa30e96ec86 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -228,7 +228,9 @@ public void deleteFiles(Iterable paths) throws BulkDeletionFailureExcept throw new RuntimeException("Interrupted when waiting for deletions to complete", e); } } - + if (executorService() != null) { + executorService().shutdown(); + } if (totalFailedDeletions > 0) { throw new BulkDeletionFailureException(totalFailedDeletions); } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 04ead0bd6791..06b29ff4e79f 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -180,7 +180,9 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu failureCount.incrementAndGet(); }) .run(this::deleteFile); - + if (executorService() != null) { + executorService().shutdown(); + } if (failureCount.get() != 0) { throw new BulkDeletionFailureException(failureCount.get()); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 246f75dcd1f3..b00ed42008f1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -270,10 +270,6 @@ private DeleteOrphanFiles.Result doExecute() { LOG.info("Custom delete function provided. Using non-bulk deletes"); deleteTasks.run(deleteFunc::accept); } - - if (deleteExecutorService != null) { - deleteExecutorService.shutdown(); - } } return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index 7c72d9a67550..ba1b9afc79ed 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -141,11 +141,6 @@ private DeleteReachableFiles.Result deleteFiles(Iterator files) { summary = deleteFiles(deleteExecutorService, deleteFunc, files); } } - - if (deleteExecutorService != null) { - deleteExecutorService.shutdown(); - } - LOG.info("Deleted {} total files", summary.totalFilesCount()); return ImmutableDeleteReachableFiles.Result.builder() diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index 711a0415cecd..2468497e42d0 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -256,10 +256,6 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { } } - if (deleteExecutorService != null) { - deleteExecutorService.shutdown(); - } - LOG.info("Deleted {} total files", summary.totalFilesCount()); return ImmutableExpireSnapshots.Result.builder() diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 9d2fc7e467cf..9a973766ad63 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.procedures; +import java.util.concurrent.ExecutorService; import org.apache.iceberg.Table; import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.io.SupportsBulkOperations; @@ -46,6 +47,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class); + private ExecutorService executorService = null; + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), @@ -132,8 +135,8 @@ public InternalRow[] call(InternalRow args) { + "IO's bulk delete.", table.io().getClass().getName()); } else { - - action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots")); + executorService = executorService(maxConcurrentDeletes, "expire-snapshots"); + action.executeDeleteWith(executorService); } } @@ -150,6 +153,10 @@ public InternalRow[] call(InternalRow args) { ExpireSnapshots.Result result = action.execute(); + if (executorService != null) { + executorService.shutdown(); + } + return toOutputRows(result); }); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 6e66ea2629b8..65c55423e4f4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.procedures; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.iceberg.Table; import org.apache.iceberg.actions.DeleteOrphanFiles; @@ -52,6 +53,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class); + private ExecutorService executorService = null; + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), @@ -166,8 +169,8 @@ public InternalRow[] call(InternalRow args) { + "IO's bulk delete.", table.io().getClass().getName()); } else { - - action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); + executorService = executorService(maxConcurrentDeletes, "remove-orphans"); + action.executeDeleteWith(executorService); } } @@ -184,6 +187,10 @@ public InternalRow[] call(InternalRow args) { DeleteOrphanFiles.Result result = action.execute(); + if (executorService != null) { + executorService.shutdown(); + } + return toOutputRows(result); }); } From 54615dc184f5282caa5898d9f17b8b904088dd17 Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Fri, 31 Mar 2023 09:34:40 +0800 Subject: [PATCH 3/4] fix code style --- .../iceberg/spark/actions/DeleteReachableFilesSparkAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index ba1b9afc79ed..ea6ac9f3dbf5 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -141,6 +141,7 @@ private DeleteReachableFiles.Result deleteFiles(Iterator files) { summary = deleteFiles(deleteExecutorService, deleteFunc, files); } } + LOG.info("Deleted {} total files", summary.totalFilesCount()); return ImmutableDeleteReachableFiles.Result.builder() From d9f7cd06d8812279be9c2c7a986199ca04d1ce26 Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Fri, 31 Mar 2023 11:31:50 +0800 Subject: [PATCH 4/4] fix suggestion --- aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java | 4 +--- .../src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java | 4 +--- .../iceberg/spark/procedures/ExpireSnapshotsProcedure.java | 1 + .../iceberg/spark/procedures/RemoveOrphanFilesProcedure.java | 1 + 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 2aa30e96ec86..7aeb75b01417 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -228,9 +228,7 @@ public void deleteFiles(Iterable paths) throws BulkDeletionFailureExcept throw new RuntimeException("Interrupted when waiting for deletions to complete", e); } } - if (executorService() != null) { - executorService().shutdown(); - } + if (totalFailedDeletions > 0) { throw new BulkDeletionFailureException(totalFailedDeletions); } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 06b29ff4e79f..04ead0bd6791 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -180,9 +180,7 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu failureCount.incrementAndGet(); }) .run(this::deleteFile); - if (executorService() != null) { - executorService().shutdown(); - } + if (failureCount.get() != 0) { throw new BulkDeletionFailureException(failureCount.get()); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 9a973766ad63..28970bdd2d68 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -135,6 +135,7 @@ public InternalRow[] call(InternalRow args) { + "IO's bulk delete.", table.io().getClass().getName()); } else { + executorService = executorService(maxConcurrentDeletes, "expire-snapshots"); action.executeDeleteWith(executorService); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 65c55423e4f4..8c2677d803ab 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -169,6 +169,7 @@ public InternalRow[] call(InternalRow args) { + "IO's bulk delete.", table.io().getClass().getName()); } else { + executorService = executorService(maxConcurrentDeletes, "remove-orphans"); action.executeDeleteWith(executorService); }