diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
index 606be57547..ba701bc77c 100644
--- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
+++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
@@ -21,10 +21,11 @@
import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
import org.apache.amoro.io.writer.GenericIcebergPartitionedFanoutWriter;
import org.apache.amoro.io.writer.IcebergFanoutPosDeleteWriter;
+import org.apache.amoro.shade.guava32.com.google.common.math.LongMath;
import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
@@ -34,7 +35,9 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.util.PropertyUtil;
+import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
@@ -54,7 +57,7 @@ protected OptimizingDataReader dataReader() {
table.schema(),
table.spec(),
table.asUnkeyedTable().encryption(),
- table.properties().get(TableProperties.DEFAULT_NAME_MAPPING),
+ table.properties().get(org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING),
false,
IdentityPartitionConverters::convertConstant,
false,
@@ -103,10 +106,84 @@ protected long targetSize() {
long targetSize = super.targetSize();
long inputSize =
Arrays.stream(input.rewrittenDataFiles()).mapToLong(DataFile::fileSizeInBytes).sum();
- // When the input files’ total size is below targetSize, remove the output file size limit to
- // avoid outputting multiple files.
- // For more details, please refer to: https://github.com/apache/amoro/issues/3645
- return inputSize < targetSize ? Long.MAX_VALUE : targetSize;
+ int inputFileCount = input.rewrittenDataFiles().length;
+
+ // If input size is less than target size, merge all files into one output file
+ if (inputSize < targetSize) {
+ // When there are multiple input files, we should merge them into one file
+ // to achieve the goal of small file consolidation
+ if (inputFileCount > 1) {
+ return inputSize;
+ }
+ // For single file case, use targetSize to avoid creating unnecessarily large file
+ return targetSize;
+ }
+
+ // Calculate expected number of output files based on input size
+ // This logic is inspired by Spark/Iceberg's SizeBasedFileRewritePlanner
+ int expectedOutputFiles = expectedOutputFiles(inputSize, targetSize);
+
+ // Calculate the split size: inputSize / expectedOutputFiles
+ // Add a small overhead to account for compression and serialization variations
+ long splitOverhead = 5L * 1024; // 5KB overhead
+ long estimatedSplitSize = (inputSize / expectedOutputFiles) + splitOverhead;
+
+ // Ensure the split size is at least targetSize to avoid creating too many small files
+ if (estimatedSplitSize < targetSize) {
+ return targetSize;
+ }
+
+ // Cap the split size at a reasonable maximum (targetSize * 1.5) to avoid creating
+ // excessively large files due to compression variations
+ long maxFileSize = (long) (targetSize * 1.5);
+ return Math.min(estimatedSplitSize, maxFileSize);
+ }
+
+ /**
+ * Determines the preferable number of output files when rewriting a particular file group.
+ *
+ *
This method decides whether to round up or round down based on what the estimated average
+ * file size will be if the remainder is distributed amongst other files. If the new average file
+ * size is no more than 10% greater than the target file size, then this method will round down
+ * when determining the number of output files. Otherwise, the remainder will be written into a
+ * separate file.
+ *
+ *
This logic is inspired by Spark/Iceberg's SizeBasedFileRewritePlanner.expectedOutputFiles
+ *
+ * @param inputSize total input size for a file group
+ * @param targetSize target file size
+ * @return the number of files this rewriter should create
+ */
+ private int expectedOutputFiles(long inputSize, long targetSize) {
+ if (inputSize < targetSize) {
+ return 1;
+ }
+
+ // Get min file size ratio (default 0.75) to determine if remainder is large enough
+ double minFileSizeRatio =
+ PropertyUtil.propertyAsDouble(
+ table.properties(),
+ TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO,
+ TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT);
+ long minFileSize = (long) (targetSize * minFileSizeRatio);
+
+ long numFilesWithRemainder = LongMath.divide(inputSize, targetSize, RoundingMode.CEILING);
+ long numFilesWithoutRemainder = LongMath.divide(inputSize, targetSize, RoundingMode.FLOOR);
+ long remainder = LongMath.mod(inputSize, targetSize);
+ long avgFileSizeWithoutRemainder = inputSize / numFilesWithoutRemainder;
+
+ if (remainder > minFileSize) {
+ // The remainder file is of a valid size for this rewrite so keep it
+ return (int) numFilesWithRemainder;
+ } else if (avgFileSizeWithoutRemainder < (long) (1.1 * targetSize)) {
+ // If the remainder is distributed amongst other files,
+ // the average file size will be no more than 10% bigger than the target file size
+ // so round down and distribute remainder amongst other files
+ return (int) numFilesWithoutRemainder;
+ } else {
+ // Keep the remainder file as it is not OK to distribute it amongst other files
+ return (int) numFilesWithRemainder;
+ }
}
private PartitionSpec fileSpec() {
diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
index 37b79036cb..71f90679e0 100644
--- a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
+++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
@@ -27,13 +27,15 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.TableProperties;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.FileHelpers;
@@ -57,6 +59,7 @@
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -93,9 +96,10 @@ public static Object[][] parameters() {
private static Map buildTableProperties(FileFormat fileFormat) {
Map tableProperties = Maps.newHashMapWithExpectedSize(3);
- tableProperties.put(TableProperties.FORMAT_VERSION, "2");
- tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
- tableProperties.put(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat.name());
+ tableProperties.put(org.apache.iceberg.TableProperties.FORMAT_VERSION, "2");
+ tableProperties.put(org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+ tableProperties.put(
+ org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat.name());
return tableProperties;
}
@@ -273,4 +277,270 @@ private CloseableIterable openFile(
String.format("Cannot read %s file: %s", fileFormat.name(), path));
}
}
+
+ /**
+ * Test targetSize() method with various input scenarios to ensure proper file merging behavior.
+ * This test uses reflection to access the protected targetSize() method.
+ */
+ @Test
+ public void testTargetSizeWithSmallInputFiles() throws Exception {
+ // Test case 1: Multiple small files (total < targetSize) should merge into one file
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long smallFileSize = 10 * 1024 * 1024; // 10MB each
+ int fileCount = 5; // 5 files, total 50MB < 128MB
+
+ DataFile[] dataFiles = createDataFilesWithSize(fileCount, smallFileSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ // Should return inputSize (50MB) to merge all files into one
+ Assert.assertEquals(
+ "Multiple small files should merge into one file",
+ smallFileSize * fileCount,
+ resultTargetSize);
+ }
+
+ @Test
+ public void testTargetSizeWithSingleSmallFile() throws Exception {
+ // Test case 2: Single small file (< targetSize) should use targetSize
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long smallFileSize = 10 * 1024 * 1024; // 10MB
+
+ DataFile[] dataFiles = createDataFilesWithSize(1, smallFileSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ // Should return targetSize for single file
+ Assert.assertEquals("Single small file should use targetSize", targetSize, resultTargetSize);
+ }
+
+ @Test
+ public void testTargetSizeWithExactTargetSize() throws Exception {
+ // Test case 3: Input size exactly equals targetSize
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+
+ DataFile[] dataFiles = createDataFilesWithSize(1, targetSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ // Should calculate based on expectedOutputFiles (should be 1)
+ Assert.assertTrue(
+ "Input size equals targetSize should return >= targetSize", resultTargetSize >= targetSize);
+ }
+
+ @Test
+ public void testTargetSizeWithLargeRemainder() throws Exception {
+ // Test case 4: Input size > targetSize with large remainder (> minFileSize)
+ // targetSize = 128MB, minFileSize = 128MB * 0.75 = 96MB
+ // inputSize = 200MB, remainder = 72MB < 96MB, but let's test with remainder > 96MB
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long inputSize = 224 * 1024 * 1024; // 224MB = 128MB * 1.75, remainder = 96MB
+ // Actually, remainder = 224 - 128 = 96MB, which equals minFileSize (96MB)
+ // So we need remainder > 96MB, let's use 225MB, remainder = 97MB > 96MB
+
+ inputSize = 225 * 1024 * 1024; // 225MB, remainder = 97MB > 96MB
+
+ DataFile[] dataFiles = createDataFilesWithSize(1, inputSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ // Should round up (2 files), so split size should be around inputSize / 2
+ long expectedSplitSize = (inputSize / 2) + (5L * 1024); // with overhead
+ long maxFileSize = (long) (targetSize * 1.5); // 192MB
+ long expectedResult = Math.min(expectedSplitSize, maxFileSize);
+ if (expectedResult < targetSize) {
+ expectedResult = targetSize;
+ }
+
+ Assert.assertTrue(
+ "Large remainder should result in appropriate split size",
+ resultTargetSize >= targetSize && resultTargetSize <= maxFileSize);
+ }
+
+ @Test
+ public void testTargetSizeWithSmallRemainderDistributed() throws Exception {
+ // Test case 5: Input size > targetSize with small remainder that can be distributed
+ // targetSize = 128MB, minFileSize = 96MB
+ // inputSize = 250MB, remainder = 122MB > 96MB, so should round up
+ // Let's use inputSize where remainder < 96MB and avgFileSize < 1.1 * targetSize
+ // inputSize = 256MB = 2 * 128MB, remainder = 0, should round down to 2 files
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long inputSize = 256 * 1024 * 1024; // 256MB = 2 * 128MB, no remainder
+
+ DataFile[] dataFiles = createDataFilesWithSize(1, inputSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ // Should round down to 2 files, so split size = 256MB / 2 = 128MB + overhead
+ long expectedSplitSize = (inputSize / 2) + (5L * 1024);
+ if (expectedSplitSize < targetSize) {
+ expectedSplitSize = targetSize;
+ }
+ long maxFileSize = (long) (targetSize * 1.5);
+ long expectedResult = Math.min(expectedSplitSize, maxFileSize);
+
+ Assert.assertEquals(
+ "Exact multiple of targetSize should split evenly", expectedResult, resultTargetSize);
+ }
+
+ @Test
+ public void testTargetSizeWithMultipleSmallFiles() throws Exception {
+ // Test case 6: Multiple small files that together exceed targetSize
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long smallFileSize = 20 * 1024 * 1024; // 20MB each
+ int fileCount = 10; // 10 files, total 200MB > 128MB
+
+ DataFile[] dataFiles = createDataFilesWithSize(fileCount, smallFileSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ long totalInputSize = smallFileSize * fileCount;
+ // Should calculate based on expectedOutputFiles
+ Assert.assertTrue(
+ "Multiple files exceeding targetSize should calculate appropriate split size",
+ resultTargetSize >= targetSize);
+ Assert.assertTrue(
+ "Split size should not exceed maxFileSize", resultTargetSize <= (long) (targetSize * 1.5));
+ }
+
+ @Test
+ public void testTargetSizeWithVeryLargeInput() throws Exception {
+ // Test case 7: Very large input size
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long inputSize = 500 * 1024 * 1024; // 500MB
+
+ DataFile[] dataFiles = createDataFilesWithSize(1, inputSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ // Should be capped at maxFileSize (targetSize * 1.5 = 192MB)
+ long maxFileSize = (long) (targetSize * 1.5);
+ Assert.assertTrue(
+ "Very large input should be capped at maxFileSize", resultTargetSize <= maxFileSize);
+ Assert.assertTrue("Result should be at least targetSize", resultTargetSize >= targetSize);
+ }
+
+ @Test
+ public void testTargetSizeWithCustomMinTargetSizeRatio() throws Exception {
+ // Test case 8: Custom min-target-size-ratio
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ double customRatio = 0.8; // 80% instead of default 75%
+ long minFileSize = (long) (targetSize * customRatio); // 102.4MB
+
+ // Set custom ratio in table properties
+ Map tableProps = Maps.newHashMap(getMixedTable().properties());
+ tableProps.put(
+ TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO, String.valueOf(customRatio));
+ getMixedTable()
+ .updateProperties()
+ .set(TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO, String.valueOf(customRatio))
+ .commit();
+
+ try {
+ // inputSize = 250MB, remainder = 122MB > 102.4MB, should round up
+ long inputSize = 250 * 1024 * 1024; // 250MB
+ DataFile[] dataFiles = createDataFilesWithSize(1, inputSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ Assert.assertTrue(
+ "Custom min-target-size-ratio should affect calculation", resultTargetSize >= targetSize);
+ } finally {
+ // Restore original ratio
+ String originalRatio =
+ tableProps.getOrDefault(
+ TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO,
+ String.valueOf(TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT));
+ getMixedTable()
+ .updateProperties()
+ .set(TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO, originalRatio)
+ .commit();
+ }
+ }
+
+ @Test
+ public void testTargetSizeBoundaryConditions() throws Exception {
+ long targetSize = TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT; // 128MB
+ long minFileSize = (long) (targetSize * 0.75); // 96MB
+
+ // Test case: inputSize = targetSize + minFileSize - 1
+ // remainder = minFileSize - 1 < minFileSize, should check avgFileSize
+ long inputSize = targetSize + minFileSize - 1; // 128MB + 96MB - 1 = 223MB
+ DataFile[] dataFiles = createDataFilesWithSize(1, inputSize);
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ dataFiles, dataFiles, new DeleteFile[] {}, new DeleteFile[] {}, getMixedTable());
+
+ IcebergRewriteExecutor executor =
+ new IcebergRewriteExecutor(input, getMixedTable(), Collections.emptyMap());
+ long resultTargetSize = invokeTargetSize(executor);
+
+ Assert.assertTrue(
+ "Boundary condition should be handled correctly", resultTargetSize >= targetSize);
+ }
+
+ /** Helper method to create DataFile array with specified size */
+ private DataFile[] createDataFilesWithSize(int count, long fileSize) {
+ DataFile[] dataFiles = new DataFile[count];
+ PartitionSpec spec = getMixedTable().spec();
+ for (int i = 0; i < count; i++) {
+ DataFiles.Builder builder = DataFiles.builder(spec);
+ builder
+ .withPath(String.format("/data/file-%d.parquet", i))
+ .withFileSizeInBytes(fileSize)
+ .withRecordCount(100);
+ if (spec.isPartitioned()) {
+ builder.withPartition(getPartitionData());
+ }
+ dataFiles[i] = builder.build();
+ }
+ return dataFiles;
+ }
+
+ /** Helper method to invoke protected targetSize() method using reflection */
+ private long invokeTargetSize(IcebergRewriteExecutor executor) throws Exception {
+ Method method = IcebergRewriteExecutor.class.getDeclaredMethod("targetSize");
+ method.setAccessible(true);
+ return (Long) method.invoke(executor);
+ }
}