Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private List<Comparable[]> convertBootstrapSourceFileMapping(List<BootstrapFileM
final List<Comparable[]> rows = new ArrayList<>();
for (BootstrapFileMapping mapping : mappingList) {
rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(),
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()});
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBootstrapFileStatus().getPath().getUri()});
}
return rows;
}
Expand Down
12 changes: 6 additions & 6 deletions hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,13 @@ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throw
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
bootstrapMapping.get(p0).get(0).getBoostrapFileStatus();
bootstrapMapping.get(p0).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBoostrapFileStatus().getPath().getUri())));
p0).get(0).getBootstrapFileStatus().getPath().getUri())));
}
cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
String file2P0C1 = partitionAndFileId002.get(p0);
Expand All @@ -579,13 +579,13 @@ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throw
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
bootstrapMapping.get(p1).get(0).getBoostrapFileStatus();
bootstrapMapping.get(p1).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p1).get(0).getBoostrapFileStatus().getPath().getUri())));
p1).get(0).getBootstrapFileStatus().getPath().getUri())));
}

// make next commit, with 2 updates to existing files, and 1 insert
Expand Down Expand Up @@ -928,7 +928,7 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
if (enableBootstrapSourceClean) {
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBoostrapFileStatus().getPath().getUri())));
p0).get(0).getBootstrapFileStatus().getPath().getUri())));
}

// No cleaning on partially written file, with no commit.
Expand Down Expand Up @@ -968,7 +968,7 @@ private Map<String, List<BootstrapFileMapping>> generateBootstrapIndexAndSourceD

for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) {
new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
assertTrue(new File(entry.getValue().get(0).getBoostrapFileStatus().getPath().getUri()).createNewFile());
assertTrue(new File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile());
}
return bootstrapMapping;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void close() {
}

/**
* Boostrap Index Writer to build bootstrap index.
* Bootstrap Index Writer to build bootstrap index.
*/
public static class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter {

Expand Down Expand Up @@ -443,7 +443,7 @@ private void writeNextPartition(String partitionPath, String bootstrapPartitionP
bootstrapPartitionMetadata.setPartitionPath(partitionPath);
bootstrapPartitionMetadata.setFileIdToBootstrapFile(
bootstrapFileMappings.stream().map(m -> Pair.of(m.getFileId(),
m.getBoostrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
Option<byte[]> bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class);
if (bytes.isPresent()) {
indexByPartitionWriter
Expand All @@ -459,14 +459,14 @@ private void writeNextPartition(String partitionPath, String bootstrapPartitionP
/**
* Write next source file to hudi file-id. Entries are expected to be appended in hudi file-group id
* order.
* @param mapping boostrap source file mapping.
* @param mapping bootstrap source file mapping.
*/
private void writeNextSourceFileMapping(BootstrapFileMapping mapping) {
try {
HoodieBootstrapFilePartitionInfo srcFilePartitionInfo = new HoodieBootstrapFilePartitionInfo();
srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath());
srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath());
srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBoostrapFileStatus());
srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus());
KeyValue kv = new KeyValue(getFileGroupKey(mapping.getFileGroupId()).getBytes(), new byte[0], new byte[0],
HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put,
TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ public class BootstrapFileMapping implements Serializable, Comparable<BootstrapF

private final String bootstrapBasePath;
private final String bootstrapPartitionPath;
private final HoodieFileStatus boostrapFileStatus;
private final HoodieFileStatus bootstrapFileStatus;

private final String partitionPath;
private final String fileId;

public BootstrapFileMapping(String bootstrapBasePath, String bootstrapPartitionPath, String partitionPath,
HoodieFileStatus boostrapFileStatus, String fileId) {
HoodieFileStatus bootstrapFileStatus, String fileId) {
this.bootstrapBasePath = bootstrapBasePath;
this.bootstrapPartitionPath = bootstrapPartitionPath;
this.partitionPath = partitionPath;
this.boostrapFileStatus = boostrapFileStatus;
this.bootstrapFileStatus = bootstrapFileStatus;
this.fileId = fileId;
}

Expand All @@ -48,7 +48,7 @@ public String toString() {
return "BootstrapFileMapping{"
+ "bootstrapBasePath='" + bootstrapBasePath + '\''
+ ", bootstrapPartitionPath='" + bootstrapPartitionPath + '\''
+ ", boostrapFileStatus=" + boostrapFileStatus
+ ", bootstrapFileStatus=" + bootstrapFileStatus
+ ", partitionPath='" + partitionPath + '\''
+ ", fileId='" + fileId + '\''
+ '}';
Expand All @@ -66,13 +66,13 @@ public boolean equals(Object o) {
return Objects.equals(bootstrapBasePath, mapping.bootstrapBasePath)
&& Objects.equals(bootstrapPartitionPath, mapping.bootstrapPartitionPath)
&& Objects.equals(partitionPath, mapping.partitionPath)
&& Objects.equals(boostrapFileStatus, mapping.boostrapFileStatus)
&& Objects.equals(bootstrapFileStatus, mapping.bootstrapFileStatus)
&& Objects.equals(fileId, mapping.fileId);
}

@Override
public int hashCode() {
return Objects.hash(bootstrapBasePath, bootstrapPartitionPath, partitionPath, boostrapFileStatus, fileId);
return Objects.hash(bootstrapBasePath, bootstrapPartitionPath, partitionPath, bootstrapFileStatus, fileId);
}

public String getBootstrapBasePath() {
Expand All @@ -87,8 +87,8 @@ public String getPartitionPath() {
return partitionPath;
}

public HoodieFileStatus getBoostrapFileStatus() {
return boostrapFileStatus;
public HoodieFileStatus getBootstrapFileStatus() {
return bootstrapFileStatus;
}

public String getFileId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
if (!isPartitionAvailableInStore(partition)) {
if (bootstrapIndex.useIndex()) {
try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
LOG.info("Boostrap Index available for partition " + partition);
LOG.info("Bootstrap Index available for partition " + partition);
List<BootstrapFileMapping> sourceFileMappings =
reader.getSourceFileMappingForPartition(partition);
addBootstrapBaseFileMapping(sourceFileMappings.stream()
.map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId()), s.getBoostrapFileStatus())));
s.getFileId()), s.getBootstrapFileStatus())));
}
}
storePartitionView(partition, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,43 +51,63 @@
*/
public class BoundedInMemoryQueue<I, O> implements Iterable<O> {

// interval used for polling records in the queue.
/** Interval used for polling records in the queue. **/
public static final int RECORD_POLL_INTERVAL_SEC = 1;
// rate used for sampling records to determine avg record size in bytes.

/** Rate used for sampling records to determine avg record size in bytes. **/
public static final int RECORD_SAMPLING_RATE = 64;
// maximum records that will be cached

/** Maximum records that will be cached **/
private static final int RECORD_CACHING_LIMIT = 128 * 1024;

private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class);
// It indicates number of records to cache. We will be using sampled record's average size to
// determine how many
// records we should cache and will change (increase/decrease) permits accordingly.

/**
* It indicates number of records to cache. We will be using sampled record's average size to
* determine how many records we should cache and will change (increase/decrease) permits accordingly.
*/
public final Semaphore rateLimiter = new Semaphore(1);
// used for sampling records with "RECORD_SAMPLING_RATE" frequency.

/** Used for sampling records with "RECORD_SAMPLING_RATE" frequency. **/
public final AtomicLong samplingRecordCounter = new AtomicLong(-1);
// internal queue for records.

/** Internal queue for records. **/
private final LinkedBlockingQueue<Option<O>> queue = new LinkedBlockingQueue<>();
// maximum amount of memory to be used for queueing records.

/** Maximum amount of memory to be used for queueing records. **/
private final long memoryLimit;
// it holds the root cause of the exception in case either queueing records (consuming from
// inputIterator) fails or
// thread reading records from queue fails.

/**
* it holds the root cause of the exception in case either queueing records
* (consuming from inputIterator) fails or thread reading records from queue fails.
*/
private final AtomicReference<Exception> hasFailed = new AtomicReference<>(null);
// used for indicating that all the records from queue are read successfully.

/** Used for indicating that all the records from queue are read successfully. **/
private final AtomicBoolean isReadDone = new AtomicBoolean(false);
// used for indicating that all records have been enqueued

/** used for indicating that all records have been enqueued. **/
private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
// Function to transform the input payload to the expected output payload

/** Function to transform the input payload to the expected output payload. **/
private final Function<I, O> transformFunction;
// Payload Size Estimator

/** Payload Size Estimator. **/
private final SizeEstimator<O> payloadSizeEstimator;
// Singleton (w.r.t this instance) Iterator for this queue

/** Singleton (w.r.t this instance) Iterator for this queue. **/
private final QueueIterator iterator;
// indicates rate limit (number of records to cache). it is updated whenever there is a change
// in avg record size.

/**
* indicates rate limit (number of records to cache). it is updated
* whenever there is a change in avg record size.
*/
public int currentRateLimit = 1;
// indicates avg record size in bytes. It is updated whenever a new record is sampled.

/** Indicates avg record size in bytes. It is updated whenever a new record is sampled. **/
public long avgRecordSizeInBytes = 0;
// indicates number of samples collected so far.

/** Indicates number of samples collected so far. **/
private long numSamples = 0;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hudi.common.util.queue;

/**
* Producer for BoundedInMemoryQueue. Memory Bounded Buffer supports multiple producers single consumer pattern.
* Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports multiple producers single consumer pattern.
*
* @param <I> Input type for buffer items produced
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void validateBootstrapIndex(Map<String, List<BootstrapFileMapping>> boot
assertEquals(x.getFileId(), res.getFileId());
assertEquals(x.getPartitionPath(), res.getPartitionPath());
assertEquals(BOOTSTRAP_BASE_PATH, res.getBootstrapBasePath());
assertEquals(x.getBoostrapFileStatus(), res.getBoostrapFileStatus());
assertEquals(x.getBootstrapFileStatus(), res.getBootstrapFileStatus());
assertEquals(x.getBootstrapPartitionPath(), res.getBootstrapPartitionPath());
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
}

@Test
public void testFullBoostrapOnlyCOW() throws Exception {
public void testFullBootstrapOnlyCOW() throws Exception {
testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
}

Expand All @@ -319,7 +319,7 @@ public void testFullBootstrapWithUpdatesMOR() throws Exception {
}

@Test
public void testMetaAndFullBoostrapCOW() throws Exception {
public void testMetaAndFullBootstrapCOW() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
}

Expand Down