diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 9d2fc7e467cf..28970bdd2d68 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.procedures; +import java.util.concurrent.ExecutorService; import org.apache.iceberg.Table; import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.io.SupportsBulkOperations; @@ -46,6 +47,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class); + private ExecutorService executorService = null; + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), @@ -133,7 +136,8 @@ public InternalRow[] call(InternalRow args) { table.io().getClass().getName()); } else { - action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots")); + executorService = executorService(maxConcurrentDeletes, "expire-snapshots"); + action.executeDeleteWith(executorService); } } @@ -150,6 +154,10 @@ public InternalRow[] call(InternalRow args) { ExpireSnapshots.Result result = action.execute(); + if (executorService != null) { + executorService.shutdown(); + } + return toOutputRows(result); }); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 6e66ea2629b8..8c2677d803ab 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.procedures; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.iceberg.Table; import org.apache.iceberg.actions.DeleteOrphanFiles; @@ -52,6 +53,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class); + private ExecutorService executorService = null; + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), @@ -167,7 +170,8 @@ public InternalRow[] call(InternalRow args) { table.io().getClass().getName()); } else { - action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); + executorService = executorService(maxConcurrentDeletes, "remove-orphans"); + action.executeDeleteWith(executorService); } } @@ -184,6 +188,10 @@ public InternalRow[] call(InternalRow args) { DeleteOrphanFiles.Result result = action.execute(); + if (executorService != null) { + executorService.shutdown(); + } + return toOutputRows(result); }); }