diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java new file mode 100644 index 000000000000..24a9b3f232e0 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.SparkException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runners.Parameterized; + +public class TestWriteAborts extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + "testhive", + SparkCatalog.class.getName(), + ImmutableMap.of( + "type", + "hive", + CatalogProperties.FILE_IO_IMPL, + CustomFileIO.class.getName(), + "default-namespace", + "default") + }, + { + "testhivebulk", + SparkCatalog.class.getName(), + ImmutableMap.of( + "type", + "hive", + CatalogProperties.FILE_IO_IMPL, + CustomBulkFileIO.class.getName(), + "default-namespace", + "default") + } + }; + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + public TestWriteAborts(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testBatchAppend() throws Exception { + String dataLocation = temp.newFolder().toString(); + + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data)" + + "TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.WRITE_DATA_LOCATION, dataLocation); + + List records = + ImmutableList.of( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "a"), + new SimpleRecord(4, "b")); + Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); + + AssertHelpers.assertThrows( + "Write must fail", + SparkException.class, + "Writing job aborted", + () -> { + try { + // incoming records are not ordered by partitions so the job must fail + inputDF + .coalesce(1) + .sortWithinPartitions("id") + .writeTo(tableName) + .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .append(); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + + assertEquals("Should be no records", sql("SELECT * FROM %s", tableName), ImmutableList.of()); + + assertEquals( + "Should be no orphan data files", + ImmutableList.of(), + sql( + "CALL %s.system.remove_orphan_files(table => '%s', older_than => %dL, location => '%s')", + catalogName, tableName, System.currentTimeMillis() + 5000, dataLocation)); + } + + public static class CustomFileIO implements FileIO { + + private final FileIO delegate = new HadoopFileIO(new Configuration()); + + public CustomFileIO() {} + + protected FileIO delegate() { + return delegate; + } + + @Override + public InputFile newInputFile(String path) { + return delegate.newInputFile(path); + } + + @Override + public OutputFile newOutputFile(String path) { + return delegate.newOutputFile(path); + } + + @Override + public void deleteFile(String path) { + delegate.deleteFile(path); + } + + @Override + public Map properties() { + return delegate.properties(); + } + + @Override + public void initialize(Map properties) { + delegate.initialize(properties); + } + + @Override + public void close() { + delegate.close(); + } + } + + public static class CustomBulkFileIO extends CustomFileIO implements SupportsBulkOperations { + + public CustomBulkFileIO() {} + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException("Only bulk deletes are supported"); + } + + @Override + public void deleteFiles(Iterable paths) throws BulkDeletionFailureException { + for (String path : paths) { + delegate().deleteFile(path); + } + } + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java new file mode 100644 index 000000000000..a103a5003222 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.spark.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A utility for cleaning up written but not committed files. */ +class SparkCleanupUtil { + + private static final Logger LOG = LoggerFactory.getLogger(SparkCleanupUtil.class); + + private static final int DELETE_NUM_RETRIES = 3; + private static final int DELETE_MIN_RETRY_WAIT_MS = 100; // 100 ms + private static final int DELETE_MAX_RETRY_WAIT_MS = 30 * 1000; // 30 seconds + private static final int DELETE_TOTAL_RETRY_TIME_MS = 2 * 60 * 1000; // 2 minutes + + private SparkCleanupUtil() {} + + /** + * Attempts to delete as many files produced by a task as possible. + * + *

Note this method will log Spark task info and is supposed to be called only on executors. + * Use {@link #deleteFiles(String, FileIO, List)} to delete files on the driver. + * + * @param io a {@link FileIO} instance used for deleting files + * @param files a list of files to delete + */ + public static void deleteTaskFiles(FileIO io, List> files) { + deleteFiles(taskInfo(), io, files); + } + + // the format matches what Spark uses for internal logging + private static String taskInfo() { + TaskContext taskContext = TaskContext.get(); + if (taskContext == null) { + return "unknown task"; + } else { + return String.format( + "partition %d (task %d, attempt %d, stage %d.%d)", + taskContext.partitionId(), + taskContext.taskAttemptId(), + taskContext.attemptNumber(), + taskContext.stageId(), + taskContext.stageAttemptNumber()); + } + } + + /** + * Attempts to delete as many given files as possible. + * + * @param context a helpful description of the operation invoking this method + * @param io a {@link FileIO} instance used for deleting files + * @param files a list of files to delete + */ + public static void deleteFiles(String context, FileIO io, List> files) { + List paths = Lists.transform(files, file -> file.path().toString()); + deletePaths(context, io, paths); + } + + private static void deletePaths(String context, FileIO io, List paths) { + if (io instanceof SupportsBulkOperations) { + SupportsBulkOperations bulkIO = (SupportsBulkOperations) io; + bulkDelete(context, bulkIO, paths); + } else { + delete(context, io, paths); + } + } + + private static void bulkDelete(String context, SupportsBulkOperations io, List paths) { + try { + io.deleteFiles(paths); + LOG.info("Deleted {} file(s) using bulk deletes ({})", paths.size(), context); + + } catch (BulkDeletionFailureException e) { + int deletedFilesCount = paths.size() - e.numberFailedObjects(); + LOG.warn( + "Deleted only {} of {} file(s) using bulk deletes ({})", + deletedFilesCount, + paths.size(), + context); + } + } + + private static void delete(String context, FileIO io, List paths) { + AtomicInteger deletedFilesCount = new AtomicInteger(0); + + Tasks.foreach(paths) + .executeWith(ThreadPools.getWorkerPool()) + .stopRetryOn(NotFoundException.class) + .suppressFailureWhenFinished() + .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc)) + .retry(DELETE_NUM_RETRIES) + .exponentialBackoff( + DELETE_MIN_RETRY_WAIT_MS, + DELETE_MAX_RETRY_WAIT_MS, + DELETE_TOTAL_RETRY_TIME_MS, + 2 /* exponential */) + .run( + path -> { + io.deleteFile(path); + deletedFilesCount.incrementAndGet(); + }); + + if (deletedFilesCount.get() < paths.size()) { + LOG.warn("Deleted only {} of {} file(s) ({})", deletedFilesCount, paths.size(), context); + } else { + LOG.info("Deleted {} file(s) ({})", paths.size(), context); + } + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 1f65184925b9..00048c470bec 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; @@ -56,6 +57,7 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.PositionDeltaWriter; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -63,8 +65,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.StructProjection; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -143,14 +143,6 @@ public DeltaBatchWrite toBatch() { return new PositionDeltaBatchWrite(); } - private static > void cleanFiles(FileIO io, Iterable files) { - Tasks.foreach(files) - .executeWith(ThreadPools.getWorkerPool()) - .throwFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); - } - private class PositionDeltaBatchWrite implements DeltaBatchWrite { @Override @@ -243,17 +235,25 @@ private Expression conflictDetectionFilter(SparkBatchQueryScan queryScan) { @Override public void abort(WriterCommitMessage[] messages) { - if (!cleanupOnAbort) { - return; + if (cleanupOnAbort) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } else { + LOG.warn("Skipping cleanup of written files"); } + } + + private List> files(WriterCommitMessage[] messages) { + List> files = Lists.newArrayList(); for (WriterCommitMessage message : messages) { if (message != null) { DeltaTaskCommit taskCommit = (DeltaTaskCommit) message; - cleanFiles(table.io(), Arrays.asList(taskCommit.dataFiles())); - cleanFiles(table.io(), Arrays.asList(taskCommit.deleteFiles())); + files.addAll(Arrays.asList(taskCommit.dataFiles())); + files.addAll(Arrays.asList(taskCommit.deleteFiles())); } } + + return files; } private void commitOperation(SnapshotUpdate operation, String description) { @@ -464,7 +464,7 @@ public void abort() throws IOException { close(); DeleteWriteResult result = delegate.result(); - cleanFiles(io, result.deleteFiles()); + SparkCleanupUtil.deleteTaskFiles(io, result.deleteFiles()); } @Override @@ -543,8 +543,14 @@ public void abort() throws IOException { close(); WriteResult result = delegate.result(); - cleanFiles(io, Arrays.asList(result.dataFiles())); - cleanFiles(io, Arrays.asList(result.deleteFiles())); + SparkCleanupUtil.deleteTaskFiles(io, files(result)); + } + + private List> files(WriteResult result) { + List> files = Lists.newArrayList(); + files.addAll(Arrays.asList(result.dataFiles())); + files.addAll(Arrays.asList(result.deleteFiles())); + return files; } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e84442a00afd..f63db416cc2a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -20,24 +20,13 @@ import static org.apache.iceberg.IsolationLevel.SERIALIZABLE; import static org.apache.iceberg.IsolationLevel.SNAPSHOT; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -62,15 +51,11 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; import org.apache.spark.api.java.JavaSparkContext; @@ -229,40 +214,23 @@ private void commitOperation(SnapshotUpdate operation, String description) { private void abort(WriterCommitMessage[] messages) { if (cleanupOnAbort) { - Map props = table.properties(); - Tasks.foreach(files(messages)) - .executeWith(ThreadPools.getWorkerPool()) - .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - PropertyUtil.propertyAsInt( - props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .throwFailureWhenFinished() - .run( - file -> { - table.io().deleteFile(file.path().toString()); - }); + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); } else { - LOG.warn( - "Skipping cleaning up of data files because Iceberg was unable to determine the final commit state"); + LOG.warn("Skipping cleanup of written files"); } } - private Iterable files(WriterCommitMessage[] messages) { - if (messages.length > 0) { - return Iterables.concat( - Iterables.transform( - Arrays.asList(messages), - message -> - message != null - ? ImmutableList.copyOf(((TaskCommit) message).files()) - : ImmutableList.of())); - } - return ImmutableList.of(); + private List files(WriterCommitMessage[] messages) { + List files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + TaskCommit taskCommit = (TaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; } @Override @@ -305,9 +273,9 @@ public void commit(WriterCommitMessage[] messages) { private class DynamicOverwrite extends BaseBatchWrite { @Override public void commit(WriterCommitMessage[] messages) { - Iterable files = files(messages); + List files = files(messages); - if (!files.iterator().hasNext()) { + if (files.isEmpty()) { LOG.info("Dynamic overwrite is empty, skipping commit"); return; } @@ -478,13 +446,7 @@ private RewriteFiles(String fileSetID) { @Override public void commit(WriterCommitMessage[] messages) { FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); - - Set newDataFiles = Sets.newHashSetWithExpectedSize(messages.length); - for (DataFile file : files(messages)) { - newDataFiles.add(file); - } - - coordinator.stageRewrite(table, fileSetID, Collections.unmodifiableSet(newDataFiles)); + coordinator.stageRewrite(table, fileSetID, ImmutableSet.copyOf(files(messages))); } } @@ -685,14 +647,6 @@ public DataWriter createWriter(int partitionId, long taskId, long e } } - private static > void deleteFiles(FileIO io, List files) { - Tasks.foreach(files) - .executeWith(ThreadPools.getWorkerPool()) - .throwFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); - } - private static class UnpartitionedDataWriter implements DataWriter { private final FileWriter delegate; private final FileIO io; @@ -728,7 +682,7 @@ public void abort() throws IOException { close(); DataWriteResult result = delegate.result(); - deleteFiles(io, result.dataFiles()); + SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles()); } @Override @@ -785,7 +739,7 @@ public void abort() throws IOException { close(); DataWriteResult result = delegate.result(); - deleteFiles(io, result.dataFiles()); + SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles()); } @Override