Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
Expand Down Expand Up @@ -66,68 +65,44 @@ public AverageRecordSizeEstimator(HoodieWriteConfig writeConfig) {
@Override
public long averageBytesPerRecord(HoodieTimeline commitTimeline, CommitMetadataSerDe commitMetadataSerDe) {
int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
final AverageRecordSizeStats averageRecordSizeStats = new AverageRecordSizeStats(hoodieWriteConfig);
final long commitSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this file slice threshold or single data file threshold?

looks like it was a bug earlier. and we should fix it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#10763 it seems to have always been this case that it's for the entire commit. Additionally, the config description is

  public static final ConfigProperty<String> RECORD_SIZE_ESTIMATION_THRESHOLD = ConfigProperty
      .key("hoodie.record.size.estimation.threshold")
      .defaultValue("1.0")
      .markAdvanced()
      .withDocumentation("We use the previous commits' metadata to calculate the estimated record size and use it "
          + " to bin pack records into partitions. If the previous commit is too small to make an accurate estimation, "
          + " Hudi will search commits in the reverse order, until we find a commit that has totalBytesWritten "
          + " larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * this_threshold)");

and the git blame is from 2021

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

final long metadataSizeEstimate = hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Stream<HoodieInstant> filteredInstants = commitTimeline.filterCompletedInstants()
Iterator<HoodieInstant> instants = commitTimeline.filterCompletedInstants()
.getReverseOrderedInstants()
.filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction()))
.limit(maxCommits);
filteredInstants
.forEach(instant -> {
HoodieCommitMetadata commitMetadata;
try {
commitMetadata = commitTimeline.readCommitMetadata(instant);
if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
// let's consider only base files in case of delta commits
commitMetadata.getWriteStats().stream().parallel()
.filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath())))
.forEach(hoodieWriteStat -> averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(), hoodieWriteStat.getNumWrites()));
} else {
averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(), commitMetadata.fetchTotalRecordsWritten());
}
} catch (IOException ignore) {
LOG.info("Failed to parse commit metadata", ignore);
}
});
.limit(maxCommits).iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
try {
HoodieCommitMetadata commitMetadata = commitTimeline.readCommitMetadata(instant);
final HoodieAtomicLongAccumulator totalBytesWritten = HoodieAtomicLongAccumulator.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why do we need an accumulator here.
we are processing all these in driver from what I can gauge.

final HoodieAtomicLongAccumulator totalRecordsWritten = HoodieAtomicLongAccumulator.create();
if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
// Only use base files for estimate
commitMetadata.getWriteStats().stream()
.filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath())))
.forEach(hoodieWriteStat -> {
totalBytesWritten.add(hoodieWriteStat.getTotalWriteBytes() - metadataSizeEstimate);
totalRecordsWritten.add(hoodieWriteStat.getNumWrites());
});
} else {
totalBytesWritten.add(commitMetadata.fetchTotalBytesWritten() - (commitMetadata.fetchTotalFiles() * metadataSizeEstimate));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we go w/ per file size threshold,
then here also, we need to loop for every writeStat

totalRecordsWritten.add(commitMetadata.fetchTotalRecordsWritten());
}
if (totalBytesWritten.value() > commitSizeThreshold && totalRecordsWritten.value() > 0) {
return (long) Math.ceil((1.0 * totalBytesWritten.value()) / totalRecordsWritten.value());
}
} catch (IOException ignore) {
LOG.info("Failed to parse commit metadata", ignore);
}
}
}
} catch (Throwable t) {
LOG.warn("Got error while trying to compute average bytes/record but will proceed to use the computed value "
+ "or fallback to default config value ", t);
}
return averageRecordSizeStats.computeAverageRecordSize();
}

private static class AverageRecordSizeStats implements Serializable {
private final HoodieAtomicLongAccumulator totalBytesWritten;
private final HoodieAtomicLongAccumulator totalRecordsWritten;
private final long fileSizeThreshold;
private final long avgMetadataSize;
private final int defaultRecordSize;

public AverageRecordSizeStats(HoodieWriteConfig hoodieWriteConfig) {
totalBytesWritten = HoodieAtomicLongAccumulator.create();
totalRecordsWritten = HoodieAtomicLongAccumulator.create();
fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
avgMetadataSize = hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
defaultRecordSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
}

private void updateStats(long fileSizeInBytes, long recordWritten) {
if (fileSizeInBytes > fileSizeThreshold && fileSizeInBytes > avgMetadataSize && recordWritten > 0) {
totalBytesWritten.add(fileSizeInBytes - avgMetadataSize);
totalRecordsWritten.add(recordWritten);
}
}

private long computeAverageRecordSize() {
if (totalBytesWritten.value() > 0 && totalRecordsWritten.value() > 0) {
return totalBytesWritten.value() / totalRecordsWritten.value();
}
// Fallback to default implementation in the cases were we either got an exception before we could
// compute the average record size or there are no eligible commits yet.
return defaultRecordSize;
}
return hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public class TestAverageRecordSizeEstimator {
private static final String PARTITION1 = "partition1";
private static final String TEST_WRITE_TOKEN = "1-0-1";
private static final Integer DEFAULT_MAX_COMMITS = 2;
private static final Integer DEFAULT_MAX_PARQUET_METADATA_SIZE = 1000;
// needs to be big enough to skew the estimate
private static final Integer DEFAULT_AVERAGE_PARQUET_METADATA_SIZE = 10000000;
private static final Double DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD = 0.1;

@Test
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testAverageRecordSizeWithNonEmptyCommitTimeline(List<Pair<HoodieInst
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withRecordSizeEstimator(AverageRecordSizeEstimator.class.getName())
.withRecordSizeEstimatorMaxCommits(DEFAULT_MAX_COMMITS)
.withRecordSizeEstimatorAverageMetadataSize(DEFAULT_MAX_PARQUET_METADATA_SIZE)
.withRecordSizeEstimatorAverageMetadataSize(DEFAULT_AVERAGE_PARQUET_METADATA_SIZE)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionRecordSizeEstimateThreshold(DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD)
.build())
Expand Down Expand Up @@ -152,85 +153,85 @@ private static String getLogFileName(String instantTime) {

private static Stream<Arguments> testCases() {
Long baseInstant = 20231204194919610L;
Long standardCount = 10000000L;
List<Arguments> arguments = new ArrayList<>();
// Note the avg record estimate is based on a parquet metadata size of 500Bytes per file.
// 1. straight forward. just 1 instant.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L)))), 99L));
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L)))), 99L));

// 2. two instants. avg of both the instants
// 2. two instants. latest instant should be honored
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1000000L, 200L)))), 109L));
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L)))), 199L));

// 3. two instants, latest commit has a small file thats just above threshold, while earliest commit is fully ignored,
// since it below the threshold size limit
// 3. two instants, while 2nd one is smaller in size so as to not meet the threshold. So, 1st one should be honored
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 9000L, 1000L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 110000, 100L)))), 99L));
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1000L, 200L)))), 99L));

// 4. 2nd instance is replace commit, it shld be excluded and should be avg of both commits.
// 4. 2nd instance is replace commit, it should be excluded
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 200L))),
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L)))), 99L));
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 100L)))), 199L));

// 5. for delta commits, only parquet files should be accounted for.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L)))), 149L));
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L)))), 199L));

// 6. delta commit has a mix of parquet and log files. only parquet files should be accounted for.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 1000000L, 100L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Arrays.asList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L),
generateLogWriteStat(baseInstant + 100, 10000000L, 300L)))), 190L));
Arrays.asList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L),
generateLogWriteStat(baseInstant + 100, standardCount, 300L)))), 199L));

// 7. 2nd delta commit only has log files. and so we honor 1st delta commit size.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Arrays.asList(generateLogWriteStat(baseInstant + 100, 1000000L, 200L),
generateLogWriteStat(baseInstant + 100, 10000000L, 300L)))), 99L));
Arrays.asList(generateLogWriteStat(baseInstant + 100, standardCount, 200L),
generateLogWriteStat(baseInstant + 100, standardCount, 300L)))), 99L));

// 8. since default max commits is overriden to 2 commits, ignore the earliest commit here since there are total 3 commits
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 1000L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 200L))),
Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 50L))),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1L, 50L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 200)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 10000000L, 100L)))), 74L));
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 1L, 100L)))), Long.valueOf(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.defaultValue())));

// 9. replace commits should be ignored despite being the latest commits.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 1000000L, 100L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Arrays.asList(generateLogWriteStat(baseInstant + 100, 1000000L, 200L),
Arrays.asList(generateLogWriteStat(baseInstant + 100, standardCount, 200L),
generateLogWriteStat(baseInstant + 100, 1000000L, 300L))),
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 1000000L, 2000L))),
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, standardCount, 2000L))),
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 300, 1000000L, 3000L)))), 99L));
Collections.singletonList(generateBaseWriteStat(baseInstant + 300, standardCount, 3000L)))), 99L));

// 10. Ignore commit stat with 0 records
arguments.add(Arguments.of(
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 1000L))),
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 1000L))),
Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 50L))),
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 50L))),
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 200)),
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 0L, 1000L)))), 49L));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ public long fetchTotalFilesUpdated() {
return totalFilesUpdated;
}

public long fetchTotalFiles() {
return partitionToWriteStats.values().stream().mapToLong(List::size).sum();
}

public long fetchTotalUpdateRecordsWritten() {
long totalUpdateRecordsWritten = 0;
for (List<HoodieWriteStat> stats : partitionToWriteStats.values()) {
Expand Down