diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java index 2db49a25a5165..015743d2f299f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java @@ -179,7 +179,7 @@ private List convertBootstrapSourceFileMapping(List rows = new ArrayList<>(); for (BootstrapFileMapping mapping : mappingList) { rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(), - mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()}); + mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBootstrapFileStatus().getPath().getUri()}); } return rows; } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index c1b5296b5243d..a4c7b5b7f4cb4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -559,13 +559,13 @@ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throw : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); if (enableBootstrapSourceClean) { HoodieFileStatus fstatus = - bootstrapMapping.get(p0).get(0).getBoostrapFileStatus(); + bootstrapMapping.get(p0).get(0).getBootstrapFileStatus(); // This ensures full path is recorded in metadata. assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + " but did not contain " + fstatus.getPath().getUri()); assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p0).get(0).getBoostrapFileStatus().getPath().getUri()))); + p0).get(0).getBootstrapFileStatus().getPath().getUri()))); } cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); String file2P0C1 = partitionAndFileId002.get(p0); @@ -579,13 +579,13 @@ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throw : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); if (enableBootstrapSourceClean) { HoodieFileStatus fstatus = - bootstrapMapping.get(p1).get(0).getBoostrapFileStatus(); + bootstrapMapping.get(p1).get(0).getBootstrapFileStatus(); // This ensures full path is recorded in metadata. assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + " but did not contain " + fstatus.getPath().getUri()); assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p1).get(0).getBoostrapFileStatus().getPath().getUri()))); + p1).get(0).getBootstrapFileStatus().getPath().getUri()))); } // make next commit, with 2 updates to existing files, and 1 insert @@ -928,7 +928,7 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); if (enableBootstrapSourceClean) { assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p0).get(0).getBoostrapFileStatus().getPath().getUri()))); + p0).get(0).getBootstrapFileStatus().getPath().getUri()))); } // No cleaning on partially written file, with no commit. @@ -968,7 +968,7 @@ private Map> generateBootstrapIndexAndSourceD for (Map.Entry> entry : bootstrapMapping.entrySet()) { new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs(); - assertTrue(new File(entry.getValue().get(0).getBoostrapFileStatus().getPath().getUri()).createNewFile()); + assertTrue(new File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile()); } return bootstrapMapping; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java index 7dc0f69d28840..d4a77b0822847 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java @@ -391,7 +391,7 @@ public void close() { } /** - * Boostrap Index Writer to build bootstrap index. + * Bootstrap Index Writer to build bootstrap index. */ public static class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter { @@ -443,7 +443,7 @@ private void writeNextPartition(String partitionPath, String bootstrapPartitionP bootstrapPartitionMetadata.setPartitionPath(partitionPath); bootstrapPartitionMetadata.setFileIdToBootstrapFile( bootstrapFileMappings.stream().map(m -> Pair.of(m.getFileId(), - m.getBoostrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); + m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); Option bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class); if (bytes.isPresent()) { indexByPartitionWriter @@ -459,14 +459,14 @@ private void writeNextPartition(String partitionPath, String bootstrapPartitionP /** * Write next source file to hudi file-id. Entries are expected to be appended in hudi file-group id * order. - * @param mapping boostrap source file mapping. + * @param mapping bootstrap source file mapping. */ private void writeNextSourceFileMapping(BootstrapFileMapping mapping) { try { HoodieBootstrapFilePartitionInfo srcFilePartitionInfo = new HoodieBootstrapFilePartitionInfo(); srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath()); srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath()); - srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBoostrapFileStatus()); + srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus()); KeyValue kv = new KeyValue(getFileGroupKey(mapping.getFileGroupId()).getBytes(), new byte[0], new byte[0], HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapFileMapping.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapFileMapping.java index a9642c71b3ac2..428d41104e3a3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapFileMapping.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapFileMapping.java @@ -29,17 +29,17 @@ public class BootstrapFileMapping implements Serializable, Comparable addFilesToView(FileStatus[] statuses) { if (!isPartitionAvailableInStore(partition)) { if (bootstrapIndex.useIndex()) { try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) { - LOG.info("Boostrap Index available for partition " + partition); + LOG.info("Bootstrap Index available for partition " + partition); List sourceFileMappings = reader.getSourceFileMappingForPartition(partition); addBootstrapBaseFileMapping(sourceFileMappings.stream() .map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(), - s.getFileId()), s.getBoostrapFileStatus()))); + s.getFileId()), s.getBootstrapFileStatus()))); } } storePartitionView(partition, value); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index 2c2f919cdf58a..756a48a3924bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -51,43 +51,63 @@ */ public class BoundedInMemoryQueue implements Iterable { - // interval used for polling records in the queue. + /** Interval used for polling records in the queue. **/ public static final int RECORD_POLL_INTERVAL_SEC = 1; - // rate used for sampling records to determine avg record size in bytes. + + /** Rate used for sampling records to determine avg record size in bytes. **/ public static final int RECORD_SAMPLING_RATE = 64; - // maximum records that will be cached + + /** Maximum records that will be cached **/ private static final int RECORD_CACHING_LIMIT = 128 * 1024; + private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class); - // It indicates number of records to cache. We will be using sampled record's average size to - // determine how many - // records we should cache and will change (increase/decrease) permits accordingly. + + /** + * It indicates number of records to cache. We will be using sampled record's average size to + * determine how many records we should cache and will change (increase/decrease) permits accordingly. + */ public final Semaphore rateLimiter = new Semaphore(1); - // used for sampling records with "RECORD_SAMPLING_RATE" frequency. + + /** Used for sampling records with "RECORD_SAMPLING_RATE" frequency. **/ public final AtomicLong samplingRecordCounter = new AtomicLong(-1); - // internal queue for records. + + /** Internal queue for records. **/ private final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(); - // maximum amount of memory to be used for queueing records. + + /** Maximum amount of memory to be used for queueing records. **/ private final long memoryLimit; - // it holds the root cause of the exception in case either queueing records (consuming from - // inputIterator) fails or - // thread reading records from queue fails. + + /** + * it holds the root cause of the exception in case either queueing records + * (consuming from inputIterator) fails or thread reading records from queue fails. + */ private final AtomicReference hasFailed = new AtomicReference<>(null); - // used for indicating that all the records from queue are read successfully. + + /** Used for indicating that all the records from queue are read successfully. **/ private final AtomicBoolean isReadDone = new AtomicBoolean(false); - // used for indicating that all records have been enqueued + + /** used for indicating that all records have been enqueued. **/ private final AtomicBoolean isWriteDone = new AtomicBoolean(false); - // Function to transform the input payload to the expected output payload + + /** Function to transform the input payload to the expected output payload. **/ private final Function transformFunction; - // Payload Size Estimator + + /** Payload Size Estimator. **/ private final SizeEstimator payloadSizeEstimator; - // Singleton (w.r.t this instance) Iterator for this queue + + /** Singleton (w.r.t this instance) Iterator for this queue. **/ private final QueueIterator iterator; - // indicates rate limit (number of records to cache). it is updated whenever there is a change - // in avg record size. + + /** + * indicates rate limit (number of records to cache). it is updated + * whenever there is a change in avg record size. + */ public int currentRateLimit = 1; - // indicates avg record size in bytes. It is updated whenever a new record is sampled. + + /** Indicates avg record size in bytes. It is updated whenever a new record is sampled. **/ public long avgRecordSizeInBytes = 0; - // indicates number of samples collected so far. + + /** Indicates number of samples collected so far. **/ private long numSamples = 0; /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java index 31c3a3cc1dc1b..ecea9f2193c76 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java @@ -19,7 +19,7 @@ package org.apache.hudi.common.util.queue; /** - * Producer for BoundedInMemoryQueue. Memory Bounded Buffer supports multiple producers single consumer pattern. + * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports multiple producers single consumer pattern. * * @param Input type for buffer items produced */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java index ecfb59da8593b..bbe75cf893770 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java @@ -156,7 +156,7 @@ private void validateBootstrapIndex(Map> boot assertEquals(x.getFileId(), res.getFileId()); assertEquals(x.getPartitionPath(), res.getPartitionPath()); assertEquals(BOOTSTRAP_BASE_PATH, res.getBootstrapBasePath()); - assertEquals(x.getBoostrapFileStatus(), res.getBoostrapFileStatus()); + assertEquals(x.getBootstrapFileStatus(), res.getBootstrapFileStatus()); assertEquals(x.getBootstrapPartitionPath(), res.getBootstrapPartitionPath()); }); }); diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index ad4244362570d..14f36d469d855 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -309,7 +309,7 @@ public void testMetadataBootstrapWithUpdatesMOR() throws Exception { } @Test - public void testFullBoostrapOnlyCOW() throws Exception { + public void testFullBootstrapOnlyCOW() throws Exception { testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE); } @@ -319,7 +319,7 @@ public void testFullBootstrapWithUpdatesMOR() throws Exception { } @Test - public void testMetaAndFullBoostrapCOW() throws Exception { + public void testMetaAndFullBootstrapCOW() throws Exception { testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE); }