diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java index 152c92cf3b8d2..a8569e24e2f8e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java @@ -33,9 +33,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.Serializable; +import java.util.Iterator; import java.util.Set; -import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; @@ -66,68 +65,44 @@ public AverageRecordSizeEstimator(HoodieWriteConfig writeConfig) { @Override public long averageBytesPerRecord(HoodieTimeline commitTimeline, CommitMetadataSerDe commitMetadataSerDe) { int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits(); - final AverageRecordSizeStats averageRecordSizeStats = new AverageRecordSizeStats(hoodieWriteConfig); + final long commitSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); + final long metadataSizeEstimate = hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize(); try { if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Stream filteredInstants = commitTimeline.filterCompletedInstants() + Iterator instants = commitTimeline.filterCompletedInstants() .getReverseOrderedInstants() .filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction())) - .limit(maxCommits); - filteredInstants - .forEach(instant -> { - HoodieCommitMetadata commitMetadata; - try { - commitMetadata = commitTimeline.readCommitMetadata(instant); - if (instant.getAction().equals(DELTA_COMMIT_ACTION)) { - // let's consider only base files in case of delta commits - commitMetadata.getWriteStats().stream().parallel() - .filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath()))) - .forEach(hoodieWriteStat -> averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(), hoodieWriteStat.getNumWrites())); - } else { - averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(), commitMetadata.fetchTotalRecordsWritten()); - } - } catch (IOException ignore) { - LOG.info("Failed to parse commit metadata", ignore); - } - }); + .limit(maxCommits).iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + try { + HoodieCommitMetadata commitMetadata = commitTimeline.readCommitMetadata(instant); + final HoodieAtomicLongAccumulator totalBytesWritten = HoodieAtomicLongAccumulator.create(); + final HoodieAtomicLongAccumulator totalRecordsWritten = HoodieAtomicLongAccumulator.create(); + if (instant.getAction().equals(DELTA_COMMIT_ACTION)) { + // Only use base files for estimate + commitMetadata.getWriteStats().stream() + .filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath()))) + .forEach(hoodieWriteStat -> { + totalBytesWritten.add(hoodieWriteStat.getTotalWriteBytes() - metadataSizeEstimate); + totalRecordsWritten.add(hoodieWriteStat.getNumWrites()); + }); + } else { + totalBytesWritten.add(commitMetadata.fetchTotalBytesWritten() - (commitMetadata.fetchTotalFiles() * metadataSizeEstimate)); + totalRecordsWritten.add(commitMetadata.fetchTotalRecordsWritten()); + } + if (totalBytesWritten.value() > commitSizeThreshold && totalRecordsWritten.value() > 0) { + return (long) Math.ceil((1.0 * totalBytesWritten.value()) / totalRecordsWritten.value()); + } + } catch (IOException ignore) { + LOG.info("Failed to parse commit metadata", ignore); + } + } } } catch (Throwable t) { LOG.warn("Got error while trying to compute average bytes/record but will proceed to use the computed value " + "or fallback to default config value ", t); } - return averageRecordSizeStats.computeAverageRecordSize(); - } - - private static class AverageRecordSizeStats implements Serializable { - private final HoodieAtomicLongAccumulator totalBytesWritten; - private final HoodieAtomicLongAccumulator totalRecordsWritten; - private final long fileSizeThreshold; - private final long avgMetadataSize; - private final int defaultRecordSize; - - public AverageRecordSizeStats(HoodieWriteConfig hoodieWriteConfig) { - totalBytesWritten = HoodieAtomicLongAccumulator.create(); - totalRecordsWritten = HoodieAtomicLongAccumulator.create(); - fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); - avgMetadataSize = hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize(); - defaultRecordSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); - } - - private void updateStats(long fileSizeInBytes, long recordWritten) { - if (fileSizeInBytes > fileSizeThreshold && fileSizeInBytes > avgMetadataSize && recordWritten > 0) { - totalBytesWritten.add(fileSizeInBytes - avgMetadataSize); - totalRecordsWritten.add(recordWritten); - } - } - - private long computeAverageRecordSize() { - if (totalBytesWritten.value() > 0 && totalRecordsWritten.value() > 0) { - return totalBytesWritten.value() / totalRecordsWritten.value(); - } - // Fallback to default implementation in the cases were we either got an exception before we could - // compute the average record size or there are no eligible commits yet. - return defaultRecordSize; - } + return hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); } } \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java index f59b45283750d..1ca37bf06edb2 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java @@ -63,7 +63,8 @@ public class TestAverageRecordSizeEstimator { private static final String PARTITION1 = "partition1"; private static final String TEST_WRITE_TOKEN = "1-0-1"; private static final Integer DEFAULT_MAX_COMMITS = 2; - private static final Integer DEFAULT_MAX_PARQUET_METADATA_SIZE = 1000; + // needs to be big enough to skew the estimate + private static final Integer DEFAULT_AVERAGE_PARQUET_METADATA_SIZE = 10000000; private static final Double DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD = 0.1; @Test @@ -102,7 +103,7 @@ public void testAverageRecordSizeWithNonEmptyCommitTimeline(List testCases() { Long baseInstant = 20231204194919610L; + Long standardCount = 10000000L; List arguments = new ArrayList<>(); // Note the avg record estimate is based on a parquet metadata size of 500Bytes per file. // 1. straight forward. just 1 instant. arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L)))), 99L)); + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L)))), 99L)); - // 2. two instants. avg of both the instants + // 2. two instants. latest instant should be honored arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))), Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1000000L, 200L)))), 109L)); + Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L)))), 199L)); - // 3. two instants, latest commit has a small file thats just above threshold, while earliest commit is fully ignored, - // since it below the threshold size limit + // 3. two instants, while 2nd one is smaller in size so as to not meet the threshold. So, 1st one should be honored arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 9000L, 1000L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 110000, 100L)))), 99L)); + Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1000L, 200L)))), 99L)); - // 4. 2nd instance is replace commit, it shld be excluded and should be avg of both commits. + // 4. 2nd instance is replace commit, it should be excluded arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 200L))), Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L)))), 99L)); + Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 100L)))), 199L)); // 5. for delta commits, only parquet files should be accounted for. arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L)))), 149L)); + Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L)))), 199L)); // 6. delta commit has a mix of parquet and log files. only parquet files should be accounted for. arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 1000000L, 100L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), - Arrays.asList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L), - generateLogWriteStat(baseInstant + 100, 10000000L, 300L)))), 190L)); + Arrays.asList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L), + generateLogWriteStat(baseInstant + 100, standardCount, 300L)))), 199L)); // 7. 2nd delta commit only has log files. and so we honor 1st delta commit size. arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), - Arrays.asList(generateLogWriteStat(baseInstant + 100, 1000000L, 200L), - generateLogWriteStat(baseInstant + 100, 10000000L, 300L)))), 99L)); + Arrays.asList(generateLogWriteStat(baseInstant + 100, standardCount, 200L), + generateLogWriteStat(baseInstant + 100, standardCount, 300L)))), 99L)); // 8. since default max commits is overriden to 2 commits, ignore the earliest commit here since there are total 3 commits arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 1000L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 200L))), Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 50L))), + Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1L, 50L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 200)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 10000000L, 100L)))), 74L)); + Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 1L, 100L)))), Long.valueOf(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.defaultValue()))); // 9. replace commits should be ignored despite being the latest commits. arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 1000000L, 100L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), - Arrays.asList(generateLogWriteStat(baseInstant + 100, 1000000L, 200L), + Arrays.asList(generateLogWriteStat(baseInstant + 100, standardCount, 200L), generateLogWriteStat(baseInstant + 100, 1000000L, 300L))), Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 1000000L, 2000L))), + Collections.singletonList(generateBaseWriteStat(baseInstant + 200, standardCount, 2000L))), Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 300, 1000000L, 3000L)))), 99L)); + Collections.singletonList(generateBaseWriteStat(baseInstant + 300, standardCount, 3000L)))), 99L)); // 10. Ignore commit stat with 0 records arguments.add(Arguments.of( Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), - Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 1000L))), + Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 1000L))), Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), - Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 50L))), + Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 50L))), Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 200)), Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 0L, 1000L)))), 49L)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 1fb84c17ba453..5d8f7e271a7df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -299,6 +299,10 @@ public long fetchTotalFilesUpdated() { return totalFilesUpdated; } + public long fetchTotalFiles() { + return partitionToWriteStats.values().stream().mapToLong(List::size).sum(); + } + public long fetchTotalUpdateRecordsWritten() { long totalUpdateRecordsWritten = 0; for (List stats : partitionToWriteStats.values()) {