From a6c03311cab24d39908937ab5e0372dfd6c5403d Mon Sep 17 00:00:00 2001 From: garyli1019 Date: Sat, 5 Dec 2020 20:47:04 +0800 Subject: [PATCH 1/3] [HUDI-1434] fix incorrect log file path in HoodieWriteStat --- .../apache/hudi/io/HoodieAppendHandle.java | 40 +++++++++++---- .../common/model/HoodieDeltaWriteStat.java | 25 ++++++++++ .../hudi/common/model/HoodieLogFile.java | 6 +++ .../common/table/log/HoodieLogFormat.java | 10 +++- .../model/TestHoodieDeltaWriteStat.java | 50 +++++++++++++++++++ 5 files changed, 120 insertions(+), 11 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 0c590fe8818c4..ca4da508e96ee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieKey; @@ -30,7 +31,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -61,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** * IO Operation to append data onto an existing file. @@ -87,6 +88,9 @@ public class HoodieAppendHandle extends private long averageRecordSize = 0; private HoodieLogFile currentLogFile; private Writer writer; + private String filePath = "null"; + private int logVersion = 0; + private long logOffset = 0; // Flag used to initialize some metadata private boolean doInit = true; // Total number of bytes written during this append phase (an estimation) @@ -125,19 +129,26 @@ private void init(HoodieRecord record) { Option fileSlice = rtView.getLatestFileSlice(partitionPath, fileId); // Set the base commit time as the current instantTime for new inserts into log files String baseInstantTime; + String baseFile = ""; + List logFiles = new ArrayList<>(); if (fileSlice.isPresent()) { baseInstantTime = fileSlice.get().getBaseInstantTime(); + baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse(""); + logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()); } else { baseInstantTime = instantTime; // This means there is no base data file, start appending to a new log file fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId)); LOG.info("New InsertHandle for partition :" + partitionPath); } - writeStatus.getStat().setPrevCommit(baseInstantTime); + HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) writeStatus.getStat(); + deltaWriteStat.setPrevCommit(baseInstantTime); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); - writeStatus.getStat().setPartitionPath(partitionPath); - writeStatus.getStat().setFileId(fileId); + deltaWriteStat.setPartitionPath(partitionPath); + deltaWriteStat.setFileId(fileId); + deltaWriteStat.setBaseFile(baseFile); + deltaWriteStat.setLogFiles(logFiles); averageRecordSize = sizeEstimator.sizeEstimate(record); try { //save hoodie partition meta in the partition path @@ -152,17 +163,13 @@ private void init(HoodieRecord record) { this.writer = createLogWriter(fileSlice, baseInstantTime); this.currentLogFile = writer.getLogFile(); - ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); - ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize()); + this.logOffset = this.currentLogFile.getFileSize(); } catch (Exception e) { LOG.error("Error in update task at commit " + instantTime, e); writeStatus.setGlobalError(e); throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit " + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e); } - Path path = partitionPath.length() == 0 ? new Path(writer.getLogFile().getFileName()) - : new Path(partitionPath, writer.getLogFile().getFileName()); - writeStatus.getStat().setPath(path.toString()); doInit = false; } } @@ -258,13 +265,21 @@ public WriteStatus close() { // flush any remaining records to disk doAppend(header); + String latestLogFile = ""; if (writer != null) { sizeInBytes = writer.getCurrentSize(); + latestLogFile = writer.getLogFile().getFileName(); + filePath = partitionPath.length() == 0 ? new Path(latestLogFile).toString() + : new Path(partitionPath, latestLogFile).toString(); + logVersion = writer.getLogFile().getLogVersion(); writer.close(); } - HoodieWriteStat stat = writeStatus.getStat(); + HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat(); stat.setFileId(this.fileId); + stat.setPath(this.filePath); + stat.setLogVersion(logVersion); + stat.setLogOffset(logOffset); stat.setNumWrites(recordsWritten); stat.setNumUpdateWrites(updatedRecordsWritten); stat.setNumInserts(insertRecordsWritten); @@ -272,6 +287,10 @@ public WriteStatus close() { stat.setTotalWriteBytes(estimatedNumberOfBytesWritten); stat.setFileSizeInBytes(sizeInBytes); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + // update total log file list if the latest log file was new + if (!stat.getLogFiles().contains(latestLogFile)) { + stat.appendLogFiles(latestLogFile); + } RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalUpsertTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); @@ -303,6 +322,7 @@ private Writer createLogWriter(Option fileSlice, String baseCommitTim .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath)) .withFileId(fileId).overBaseCommit(baseCommitTime) .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) + .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L)) .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs) .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) .withRolloverLogWriteToken(writeToken) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java index 1b7dcb78d80ba..c8b30e18b13e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java @@ -20,6 +20,9 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.util.ArrayList; +import java.util.List; + /** * Statistics about a single Hoodie delta log operation. */ @@ -28,6 +31,8 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat { private int logVersion; private long logOffset; + private String baseFile; + private List logFiles = new ArrayList(); public void setLogVersion(int logVersion) { this.logVersion = logVersion; @@ -44,4 +49,24 @@ public void setLogOffset(long logOffset) { public long getLogOffset() { return logOffset; } + + public void setBaseFile(String baseFile) { + this.baseFile = baseFile; + } + + public String getBaseFile() { + return baseFile; + } + + public void setLogFiles(List logFiles) { + this.logFiles = logFiles; + } + + public void appendLogFiles(String logFile) { + logFiles.add(logFile); + } + + public List getLogFiles() { + return logFiles; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index fa7f9b1bbe5d7..2515659c7b5fd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -63,6 +63,12 @@ public HoodieLogFile(Path logPath) { this.fileLen = 0; } + public HoodieLogFile(Path logPath, Long fileLen) { + this.fileStatus = null; + this.pathStr = logPath.toString(); + this.fileLen = fileLen; + } + public HoodieLogFile(String logPathStr) { this.fileStatus = null; this.pathStr = logPathStr; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index b3700fbedf0cc..86f14956fe1c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -125,6 +125,8 @@ class WriterBuilder { // version number for this log file. If not specified, then the current version will be // computed by inspecting the file system private Integer logVersion; + // file len of this log file + private Long fileLen = 0L; // Location of the directory containing the log private Path parentPath; // Log File Write Token @@ -182,6 +184,11 @@ public WriterBuilder withLogVersion(int version) { return this; } + public WriterBuilder withFileSize(long fileLen) { + this.fileLen = fileLen; + return this; + } + public WriterBuilder onParentPath(Path parentPath) { this.parentPath = parentPath; return this; @@ -229,13 +236,14 @@ public Writer build() throws IOException, InterruptedException { if (logWriteToken == null) { // This is the case where we have existing log-file with old format. rollover to avoid any conflicts logVersion += 1; + fileLen = 0L; logWriteToken = rolloverLogWriteToken; } Path logPath = new Path(parentPath, FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken)); LOG.info("HoodieLogFile on path " + logPath); - HoodieLogFile logFile = new HoodieLogFile(logPath); + HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen); if (bufferSize == null) { bufferSize = FSUtils.getDefaultBufferSize(fs); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java new file mode 100644 index 0000000000000..c9586fe7560c3 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests hoodie delta write stat {@link HoodieDeltaWriteStat}. + */ +public class TestHoodieDeltaWriteStat { + + @Test + public void testBaseFileAndLogFiles() { + HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat(); + String baseFile = "file1.parquet"; + String logFile1 = ".log1.log"; + String logFile2 = ".log2.log"; + + writeStat.setBaseFile(baseFile); + writeStat.appendLogFiles(logFile1); + writeStat.appendLogFiles(logFile2); + assertTrue(writeStat.getLogFiles().contains(logFile1)); + assertTrue(writeStat.getLogFiles().contains(logFile2)); + assertEquals(baseFile, writeStat.getBaseFile()); + + writeStat.setLogFiles(new ArrayList<>()); + assertTrue(writeStat.getLogFiles().isEmpty()); + } +} From 9c25633033caed35cb45d0b9e188fa86b0ef1ca0 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 23 Dec 2020 14:22:46 -0800 Subject: [PATCH 2/3] HoodieWriteHandle#close() returns a list of WriteStatus objs --- .../hudi/execution/CopyOnWriteInsertHandler.java | 8 ++++---- .../org/apache/hudi/io/HoodieAppendHandle.java | 10 +++------- .../org/apache/hudi/io/HoodieCreateHandle.java | 14 +++++--------- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 11 ++++------- .../apache/hudi/io/HoodieSortedMergeHandle.java | 3 ++- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 12 ++++++++++-- .../commit/BaseFlinkCommitActionExecutor.java | 9 +++++---- .../hudi/table/HoodieSparkCopyOnWriteTable.java | 13 +++++++------ .../SparkBootstrapCommitActionExecutor.java | 5 +++-- .../commit/BaseSparkCommitActionExecutor.java | 9 +++++---- .../AbstractSparkDeltaCommitActionExecutor.java | 5 ++--- .../hudi/client/TestUpdateSchemaEvolution.java | 4 ++-- 12 files changed, 52 insertions(+), 51 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java index 8af72f351fff0..e588995d61efe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java @@ -71,7 +71,7 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime, public void consumeOneRecord(HoodieInsertValueGenResult payload) { final HoodieRecord insertPayload = payload.record; String partitionPath = insertPayload.getPartitionPath(); - HoodieWriteHandle handle = handles.get(partitionPath); + HoodieWriteHandle handle = handles.get(partitionPath); if (handle == null) { // If the records are sorted, this means that we encounter a new partition path // and the records for the previous partition path are all written, @@ -87,7 +87,7 @@ public void consumeOneRecord(HoodieInsertValueGenResult payload) { if (!handle.canWrite(payload.record)) { // Handle is full. Close the handle and add the WriteStatus - statuses.add(handle.close()); + statuses.addAll(handle.close()); // Open new handle handle = writeHandleFactory.create(config, instantTime, hoodieTable, insertPayload.getPartitionPath(), idPrefix, taskContextSupplier); @@ -108,8 +108,8 @@ public List getResult() { } private void closeOpenHandles() { - for (HoodieWriteHandle handle : handles.values()) { - statuses.add(handle.close()); + for (HoodieWriteHandle handle : handles.values()) { + statuses.addAll(handle.close()); } handles.clear(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index ca4da508e96ee..d01d1a62d4fd6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -260,7 +261,7 @@ public void write(HoodieRecord record, Option insertValue) { } @Override - public WriteStatus close() { + public List close() { try { // flush any remaining records to disk doAppend(header); @@ -298,17 +299,12 @@ public WriteStatus close() { LOG.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); - return writeStatus; + return Collections.singletonList(writeStatus); } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); } } - @Override - public WriteStatus getWriteStatus() { - return writeStatus; - } - @Override public IOType getIOType() { return IOType.APPEND; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 6a8e7735093d5..0abd76091bac0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -44,7 +44,9 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; public class HoodieCreateHandle extends HoodieWriteHandle { @@ -162,11 +164,6 @@ public void write() { } } - @Override - public WriteStatus getWriteStatus() { - return writeStatus; - } - @Override public IOType getIOType() { return IOType.CREATE; @@ -176,9 +173,8 @@ public IOType getIOType() { * Performs actions to durably, persist the current changes and returns a WriteStatus object. */ @Override - public WriteStatus close() { - LOG - .info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); + public List close() { + LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { fileWriter.close(); @@ -203,7 +199,7 @@ public WriteStatus close() { LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalCreateTime())); - return writeStatus; + return Collections.singletonList(writeStatus); } catch (IOException e) { throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 1b98de4398a8e..0acf37e0b165a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -50,8 +50,10 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -258,7 +260,7 @@ public void write(GenericRecord oldRecord) { } @Override - public WriteStatus close() { + public List close() { try { // write out any pending records (this can happen when inserts are turned into updates) Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) @@ -301,7 +303,7 @@ public WriteStatus close() { LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); - return writeStatus; + return Collections.singletonList(writeStatus); } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); } @@ -333,11 +335,6 @@ public Path getOldFilePath() { return oldFilePath; } - @Override - public WriteStatus getWriteStatus() { - return writeStatus; - } - @Override public IOType getIOType() { return IOType.MERGE; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 71610b1aa9c2f..467196cb84bda 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; @@ -101,7 +102,7 @@ public void write(GenericRecord oldRecord) { } @Override - public WriteStatus close() { + public List close() { // write out any pending records (this can happen when inserts are turned into updates) newRecordKeysSorted.stream().forEach(key -> { try { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 85898bccc7052..47eef8ad80d74 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -45,6 +45,8 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * Base class for all write operations logically performed at the file group level. @@ -167,9 +169,15 @@ protected GenericRecord rewriteRecord(GenericRecord record) { return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields); } - public abstract WriteStatus close(); + public abstract List close(); - public abstract WriteStatus getWriteStatus(); + public List writeStatuses() { + return Collections.singletonList(writeStatus); + } + + public String getPartitionPath() { + return partitionPath; + } public abstract IOType getIOType(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 1d40b8e95a539..06ba660119a3d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -267,7 +267,7 @@ public Iterator> handleUpdate(String partitionPath, String fil return handleUpdateInternal(upsertHandle, fileId); } - protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( @@ -277,11 +277,12 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups } // TODO(vc): This needs to be revisited - if (upsertHandle.getWriteStatus().getPartitionPath() == null) { + if (upsertHandle.getPartitionPath() == null) { LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " - + upsertHandle.getWriteStatus()); + + upsertHandle.writeStatuses()); } - return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); + + return Collections.singletonList(upsertHandle.writeStatuses()).iterator(); } protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator> recordItr) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 71085a232122f..4c450f484115f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -177,7 +177,7 @@ public Iterator> handleUpdate(String instantTime, String parti return handleUpdateInternal(upsertHandle, instantTime, fileId); } - protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime, + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime, String fileId) throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( @@ -187,11 +187,12 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups } // TODO(vc): This needs to be revisited - if (upsertHandle.getWriteStatus().getPartitionPath() == null) { + if (upsertHandle.getPartitionPath() == null) { LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " - + upsertHandle.getWriteStatus()); + + upsertHandle.writeStatuses()); } - return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); + + return Collections.singletonList(upsertHandle.writeStatuses()).iterator(); } protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, @@ -207,10 +208,10 @@ protected HoodieMergeHandle getUpdateHandle(String instantTime, String partition public Iterator> handleInsert(String instantTime, String partitionPath, String fileId, Map> recordMap) { - HoodieCreateHandle createHandle = + HoodieCreateHandle createHandle = new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); createHandle.write(); - return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator(); + return Collections.singletonList(createHandle.close()).iterator(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 2dd9fd039dbea..1aca28b6d8f79 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -291,7 +291,7 @@ private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, St HoodieFileStatus srcFileStatus, KeyGeneratorInterface keyGenerator) { Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath()); - HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, + HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier()); Schema avroSchema = null; try { @@ -329,7 +329,8 @@ private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, St } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } - BootstrapWriteStatus writeStatus = (BootstrapWriteStatus)bootstrapHandle.getWriteStatus(); + + BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0); BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping( config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath, srcFileStatus, writeStatus.getFileId()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 73be8d4127413..8c8fe4bc04483 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -279,7 +279,7 @@ public Iterator> handleUpdate(String partitionPath, String fil return handleUpdateInternal(upsertHandle, fileId); } - protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( @@ -289,11 +289,12 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups } // TODO(vc): This needs to be revisited - if (upsertHandle.getWriteStatus().getPartitionPath() == null) { + if (upsertHandle.getPartitionPath() == null) { LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " - + upsertHandle.getWriteStatus()); + + upsertHandle.writeStatuses()); } - return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); + + return Collections.singletonList(upsertHandle.writeStatuses()).iterator(); } protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator> recordItr) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java index 64d4c9ce85779..c92cd928474cc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java @@ -79,11 +79,10 @@ public Iterator> handleUpdate(String partitionPath, String fil LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId); return super.handleUpdate(partitionPath, fileId, recordItr); } else { - HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, instantTime, table, + HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, instantTime, table, partitionPath, fileId, recordItr, taskContextSupplier); appendHandle.doAppend(); - appendHandle.close(); - return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator(); + return Collections.singletonList(appendHandle.close()).iterator(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 9a8d7e0c88898..9b53de1d682ca 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -84,10 +84,10 @@ private WriteStatus prepareFirstRecordCommit(List recordsStrs) throws IO } Map insertRecordMap = insertRecords.stream() .collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity())); - HoodieCreateHandle createHandle = + HoodieCreateHandle createHandle = new HoodieCreateHandle(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier); createHandle.write(); - return createHandle.close(); + return createHandle.close().get(0); }).collect(); final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100")); From 150d1832855060664a0ed76cff9a6f0c65890bc0 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Thu, 24 Dec 2020 17:42:34 -0800 Subject: [PATCH 3/3] Handle rolled-over log files and return a WriteStatus per log file written - Combined data and delete block logging into a single call - Lazily initialize and manage write status based on returned AppendResult - Use FSUtils.getFileSize() to set final file size, consistent with other handles - Added tests around returned values in AppendResult - Added validation of the file sizes returned in write stat --- .../commands/TestHoodieLogFileCommand.java | 4 +- .../apache/hudi/io/HoodieAppendHandle.java | 220 ++++++++++++------ .../org/apache/hudi/io/HoodieWriteHandle.java | 2 +- .../hudi/table/HoodieTimelineArchiveLog.java | 4 +- .../AbstractMarkerBasedRollbackStrategy.java | 2 +- .../rollback/ListingBasedRollbackHelper.java | 2 +- .../commit/BaseJavaCommitActionExecutor.java | 11 +- .../rollback/ListingBasedRollbackHelper.java | 2 +- .../table/TestHoodieMergeOnReadTable.java | 70 +++--- .../common/model/HoodieDeltaWriteStat.java | 4 +- .../hudi/common/model/HoodieWriteStat.java | 2 +- .../hudi/common/table/log/AppendResult.java | 50 ++++ .../common/table/log/HoodieLogFormat.java | 31 ++- .../table/log/HoodieLogFormatWriter.java | 137 ++++++----- .../functional/TestHoodieLogFormat.java | 158 +++++++------ .../TestHoodieLogFormatAppendFailure.java | 2 +- .../model/TestHoodieDeltaWriteStat.java | 4 +- .../hadoop/testutils/InputFormatTestUtil.java | 10 +- 18 files changed, 449 insertions(+), 266 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/log/AppendResult.java diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index fbd2b92c2bb00..78460b1763c69 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -104,7 +104,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = new HoodieAvroDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); } finally { if (writer != null) { writer.close(); @@ -183,7 +183,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); } finally { if (writer != null) { writer.close(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index d01d1a62d4fd6..ab9ccdd811118 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieDataBlock; @@ -42,6 +43,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieAppendException; @@ -56,7 +58,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -71,51 +72,49 @@ public class HoodieAppendHandle extends private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class); // This acts as the sequenceID for records written - private static AtomicLong recordIndex = new AtomicLong(1); + private static final AtomicLong RECORD_COUNTER = new AtomicLong(1); + private final String fileId; // Buffer for holding records in memory before they are flushed to disk - private List recordList = new ArrayList<>(); + private final List recordList = new ArrayList<>(); // Buffer for holding records (to be deleted) in memory before they are flushed to disk - private List keysToDelete = new ArrayList<>(); + private final List keysToDelete = new ArrayList<>(); + // Incoming records to be written to logs. + private final Iterator> recordItr; + // Writer to log into the file group's latest slice. + private Writer writer; - private Iterator> recordItr; + private final List statuses; // Total number of records written during an append private long recordsWritten = 0; // Total number of records deleted during an append private long recordsDeleted = 0; // Total number of records updated during an append private long updatedRecordsWritten = 0; + // Total number of new records inserted into the delta file + private long insertRecordsWritten = 0; + // Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk private long averageRecordSize = 0; - private HoodieLogFile currentLogFile; - private Writer writer; - private String filePath = "null"; - private int logVersion = 0; - private long logOffset = 0; // Flag used to initialize some metadata private boolean doInit = true; // Total number of bytes written during this append phase (an estimation) private long estimatedNumberOfBytesWritten; - // Total number of bytes written to file - private long sizeInBytes = 0; // Number of records that must be written to meet the max block size for a log block private int numberOfRecords = 0; // Max block size to limit to for a log block - private int maxBlockSize = config.getLogFileDataBlockMaxSize(); + private final int maxBlockSize = config.getLogFileDataBlockMaxSize(); // Header metadata for a log block - private Map header = new HashMap<>(); - // Total number of new records inserted into the delta file - private long insertRecordsWritten = 0; - + private final Map header = new HashMap<>(); private SizeEstimator sizeEstimator; public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, Iterator> recordItr, TaskContextSupplier taskContextSupplier) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); - writeStatus.setStat(new HoodieDeltaWriteStat()); this.fileId = fileId; this.recordItr = recordItr; sizeEstimator = new DefaultSizeEstimator(); + this.statuses = new ArrayList<>(); } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, @@ -140,17 +139,22 @@ private void init(HoodieRecord record) { baseInstantTime = instantTime; // This means there is no base data file, start appending to a new log file fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId)); - LOG.info("New InsertHandle for partition :" + partitionPath); + LOG.info("New AppendHandle for partition :" + partitionPath); } - HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) writeStatus.getStat(); - deltaWriteStat.setPrevCommit(baseInstantTime); + + // Prepare the first write status + writeStatus.setStat(new HoodieDeltaWriteStat()); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); + averageRecordSize = sizeEstimator.sizeEstimate(record); + + HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) writeStatus.getStat(); + deltaWriteStat.setPrevCommit(baseInstantTime); deltaWriteStat.setPartitionPath(partitionPath); deltaWriteStat.setFileId(fileId); deltaWriteStat.setBaseFile(baseFile); deltaWriteStat.setLogFiles(logFiles); - averageRecordSize = sizeEstimator.sizeEstimate(record); + try { //save hoodie partition meta in the partition path HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime, @@ -163,8 +167,6 @@ private void init(HoodieRecord record) { createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())); this.writer = createLogWriter(fileSlice, baseInstantTime); - this.currentLogFile = writer.getLogFile(); - this.logOffset = this.currentLogFile.getFileSize(); } catch (Exception e) { LOG.error("Error in update task at commit " + instantTime, e); writeStatus.setGlobalError(e); @@ -176,14 +178,14 @@ private void init(HoodieRecord record) { } private Option getIndexedRecord(HoodieRecord hoodieRecord) { - Option recordMetadata = hoodieRecord.getData().getMetadata(); + Option> recordMetadata = hoodieRecord.getData().getMetadata(); try { Option avroRecord = hoodieRecord.getData().getInsertValue(writerSchema); if (avroRecord.isPresent()) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get())); String seqId = - HoodieRecord.generateSequenceId(instantTime, getPartitionId(), recordIndex.getAndIncrement()); + HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId); @@ -211,6 +213,105 @@ private Option getIndexedRecord(HoodieRecord hoodieRecord) { return Option.empty(); } + private void initNewStatus() { + HoodieDeltaWriteStat prevStat = (HoodieDeltaWriteStat) this.writeStatus.getStat(); + // Make a new write status and copy basic fields over. + HoodieDeltaWriteStat stat = new HoodieDeltaWriteStat(); + stat.setFileId(fileId); + stat.setPartitionPath(partitionPath); + stat.setPrevCommit(prevStat.getPrevCommit()); + stat.setBaseFile(prevStat.getBaseFile()); + stat.setLogFiles(new ArrayList<>(prevStat.getLogFiles())); + + this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), + !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); + this.writeStatus.setFileId(fileId); + this.writeStatus.setPartitionPath(partitionPath); + this.writeStatus.setStat(stat); + } + + private String makeFilePath(HoodieLogFile logFile) { + return partitionPath.length() == 0 + ? new Path(logFile.getFileName()).toString() + : new Path(partitionPath, logFile.getFileName()).toString(); + } + + private void resetWriteCounts() { + recordsWritten = 0; + updatedRecordsWritten = 0; + insertRecordsWritten = 0; + recordsDeleted = 0; + } + + private void updateWriteCounts(HoodieDeltaWriteStat stat, AppendResult result) { + stat.setNumWrites(recordsWritten); + stat.setNumUpdateWrites(updatedRecordsWritten); + stat.setNumInserts(insertRecordsWritten); + stat.setNumDeletes(recordsDeleted); + stat.setTotalWriteBytes(result.size()); + } + + private void accumulateWriteCounts(HoodieDeltaWriteStat stat, AppendResult result) { + stat.setNumWrites(stat.getNumWrites() + recordsWritten); + stat.setNumUpdateWrites(stat.getNumUpdateWrites() + updatedRecordsWritten); + stat.setNumInserts(stat.getNumInserts() + insertRecordsWritten); + stat.setNumDeletes(stat.getNumDeletes() + recordsDeleted); + stat.setTotalWriteBytes(stat.getTotalWriteBytes() + result.size()); + } + + private void updateWriteStat(HoodieDeltaWriteStat stat, AppendResult result) { + stat.setPath(makeFilePath(result.logFile())); + stat.setLogOffset(result.offset()); + stat.setLogVersion(result.logFile().getLogVersion()); + if (!stat.getLogFiles().contains(result.logFile().getFileName())) { + stat.addLogFiles(result.logFile().getFileName()); + } + } + + private void updateRuntimeStats(HoodieDeltaWriteStat stat) { + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalUpsertTime(timer.endTimer()); + stat.setRuntimeStats(runtimeStats); + } + + private void accumulateRuntimeStats(HoodieDeltaWriteStat stat) { + RuntimeStats runtimeStats = stat.getRuntimeStats(); + assert runtimeStats != null; + runtimeStats.setTotalUpsertTime(runtimeStats.getTotalUpsertTime() + timer.endTimer()); + } + + private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) { + updateWriteStat(stat, result); + updateWriteCounts(stat, result); + updateRuntimeStats(stat); + statuses.add(this.writeStatus); + } + + private void processAppendResult(AppendResult result) { + HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) this.writeStatus.getStat(); + + if (stat.getPath() == null) { + // first time writing to this log block. + updateWriteStatus(stat, result); + } else if (stat.getPath().endsWith(result.logFile().getFileName())) { + // append/continued writing to the same log file + stat.setLogOffset(Math.min(stat.getLogOffset(), result.offset())); + accumulateWriteCounts(stat, result); + accumulateRuntimeStats(stat); + } else { + // written to a newer log file, due to rollover/otherwise. + initNewStatus(); + stat = (HoodieDeltaWriteStat) this.writeStatus.getStat(); + updateWriteStatus(stat, result); + } + + resetWriteCounts(); + assert stat.getRuntimeStats() != null; + LOG.info(String.format("AppendHandle for partitionPath %s filePath %s, took %d ms.", partitionPath, + stat.getPath(), stat.getRuntimeStats().getTotalUpsertTime())); + timer.startTimer(); + } + public void doAppend() { while (recordItr.hasNext()) { HoodieRecord record = recordItr.next(); @@ -218,24 +319,30 @@ public void doAppend() { flushToDiskIfRequired(record); writeToBuffer(record); } - doAppend(header); + appendDataAndDeleteBlocks(header); estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; } - private void doAppend(Map header) { + private void appendDataAndDeleteBlocks(Map header) { try { header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchemaWithMetafields.toString()); + List blocks = new ArrayList<>(2); if (recordList.size() > 0) { - writer = writer.appendBlock(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); - recordList.clear(); + blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); } if (keysToDelete.size() > 0) { - writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); + blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); + } + + if (blocks.size() > 0) { + AppendResult appendResult = writer.appendBlocks(blocks); + processAppendResult(appendResult); + recordList.clear(); keysToDelete.clear(); } } catch (Exception e) { - throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e); + throw new HoodieAppendException("Failed while appending records to " + writer.getLogFile().getPath(), e); } } @@ -247,7 +354,7 @@ public boolean canWrite(HoodieRecord record) { @Override public void write(HoodieRecord record, Option insertValue) { - Option recordMetadata = record.getData().getMetadata(); + Option> recordMetadata = record.getData().getMetadata(); try { init(record); flushToDiskIfRequired(record); @@ -264,42 +371,17 @@ public void write(HoodieRecord record, Option insertValue) { public List close() { try { // flush any remaining records to disk - doAppend(header); - - String latestLogFile = ""; + appendDataAndDeleteBlocks(header); if (writer != null) { - sizeInBytes = writer.getCurrentSize(); - latestLogFile = writer.getLogFile().getFileName(); - filePath = partitionPath.length() == 0 ? new Path(latestLogFile).toString() - : new Path(partitionPath, latestLogFile).toString(); - logVersion = writer.getLogFile().getLogVersion(); writer.close(); - } - HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat(); - stat.setFileId(this.fileId); - stat.setPath(this.filePath); - stat.setLogVersion(logVersion); - stat.setLogOffset(logOffset); - stat.setNumWrites(recordsWritten); - stat.setNumUpdateWrites(updatedRecordsWritten); - stat.setNumInserts(insertRecordsWritten); - stat.setNumDeletes(recordsDeleted); - stat.setTotalWriteBytes(estimatedNumberOfBytesWritten); - stat.setFileSizeInBytes(sizeInBytes); - stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); - // update total log file list if the latest log file was new - if (!stat.getLogFiles().contains(latestLogFile)) { - stat.appendLogFiles(latestLogFile); + // update final size, once for all log files + for (WriteStatus status: statuses) { + long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); + status.getStat().setFileSizeInBytes(logFileSize); + } } - RuntimeStats runtimeStats = new RuntimeStats(); - runtimeStats.setTotalUpsertTime(timer.endTimer()); - stat.setRuntimeStats(runtimeStats); - - LOG.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalUpsertTime())); - - return Collections.singletonList(writeStatus); + return statuses; } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); } @@ -310,6 +392,10 @@ public IOType getIOType() { return IOType.APPEND; } + public List writeStatuses() { + return statuses; + } + private Writer createLogWriter(Option fileSlice, String baseCommitTime) throws IOException, InterruptedException { Option latestLogFile = fileSlice.get().getLatestLogFile(); @@ -320,8 +406,8 @@ private Writer createLogWriter(Option fileSlice, String baseCommitTim .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L)) .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs) - .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) .withRolloverLogWriteToken(writeToken) + .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); } @@ -356,7 +442,7 @@ private void flushToDiskIfRequired(HoodieRecord record) { // avg of new and old LOG.info("AvgRecordSize => " + averageRecordSize); averageRecordSize = (averageRecordSize + sizeEstimator.sizeEstimate(record)) / 2; - doAppend(header); + appendDataAndDeleteBlocks(header); estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; numberOfRecords = 0; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 47eef8ad80d74..3b09a59e8672a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -58,7 +58,7 @@ public abstract class HoodieWriteHandle protected final Schema writerSchema; protected final Schema writerSchemaWithMetafields; protected HoodieTimer timer; - protected final WriteStatus writeStatus; + protected WriteStatus writeStatus; protected final String partitionPath; protected final String fileId; protected final String writeToken; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index a6b11af23dc36..e24ae73e38389 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -106,7 +106,7 @@ private Writer openWriter() { } else { return this.writer; } - } catch (InterruptedException | IOException e) { + } catch (IOException e) { throw new HoodieException("Unable to initialize HoodieLogFormat writer", e); } } @@ -335,7 +335,7 @@ private void writeToFile(Schema wrapperSchema, List records) thro Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); - this.writer = writer.appendBlock(block); + writer.appendBlock(block); records.clear(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java index 40526b86f2cdb..657057f91e2e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java @@ -108,7 +108,7 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant // generate metadata Map header = RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime); // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); + writer.appendBlock(new HoodieCommandBlock(header)); } finally { try { if (writer != null) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index e88be4e6d393b..ca0a8ba38bc29 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -128,7 +128,7 @@ Map maybeDeleteAndCollectStats(HoodieEngineContext c if (doDelete) { Map header = generateHeader(instantToRollback.getTimestamp()); // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); + writer.appendBlock(new HoodieCommandBlock(header)); } } catch (IOException | InterruptedException io) { throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 17b02e8c15259..234e2849c2abf 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -265,7 +265,7 @@ public Iterator> handleUpdate(String partitionPath, String fil return handleUpdateInternal(upsertHandle, fileId); } - protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( @@ -274,12 +274,11 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups JavaMergeHelper.newInstance().runMerge(table, upsertHandle); } - // TODO(vc): This needs to be revisited - if (upsertHandle.getWriteStatus().getPartitionPath() == null) { - LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " - + upsertHandle.getWriteStatus()); + List statuses = upsertHandle.writeStatuses(); + if (upsertHandle.getPartitionPath() == null) { + LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + statuses); } - return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); + return Collections.singletonList(statuses).iterator(); } protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator> recordItr) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index 9cf2434bc22a3..d2a78af960cfa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -128,7 +128,7 @@ JavaPairRDD maybeDeleteAndCollectStats(HoodieEngineC if (doDelete) { Map header = generateHeader(instantToRollback.getTimestamp()); // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); + writer.appendBlock(new HoodieCommandBlock(header)); } } catch (IOException | InterruptedException io) { throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 42584b12514b4..181b88c83d253 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -18,21 +18,18 @@ package org.apache.hudi.table; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; @@ -73,6 +70,12 @@ import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -148,7 +151,7 @@ public void testSimpleInsertAndUpdate() throws Exception { client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 200); - insertAndGetFilePaths(records, client, cfg, newCommitTime); + insertRecords(records, client, cfg, newCommitTime); /** * Write 2 (updates) @@ -156,7 +159,7 @@ public void testSimpleInsertAndUpdate() throws Exception { newCommitTime = "004"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateAndGetFilePaths(records, client, cfg, newCommitTime); + updateRecords(records, client, cfg, newCommitTime); String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); client.compact(compactionCommitTime); @@ -164,7 +167,6 @@ public void testSimpleInsertAndUpdate() throws Exception { HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); @@ -196,7 +198,7 @@ public void testSimpleInsertAndUpdateHFile() throws Exception { client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 200); - insertAndGetFilePaths(records, client, cfg, newCommitTime); + insertRecords(records, client, cfg, newCommitTime); /** * Write 2 (updates) @@ -204,7 +206,7 @@ public void testSimpleInsertAndUpdateHFile() throws Exception { newCommitTime = "004"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateAndGetFilePaths(records, client, cfg, newCommitTime); + updateRecords(records, client, cfg, newCommitTime); String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); client.compact(compactionCommitTime); @@ -253,7 +255,7 @@ private void testClustering(boolean doUpdates) throws Exception { client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 400); - insertAndGetFilePaths(records.subList(0, 200), client, cfg, newCommitTime); + insertRecords(records.subList(0, 200), client, cfg, newCommitTime); /** * Write 2 (more inserts to create new files) @@ -261,7 +263,7 @@ private void testClustering(boolean doUpdates) throws Exception { // we already set small file size to small number to force inserts to go into new file. newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - insertAndGetFilePaths(records.subList(200, 400), client, cfg, newCommitTime); + insertRecords(records.subList(200, 400), client, cfg, newCommitTime); if (doUpdates) { /** @@ -270,7 +272,7 @@ private void testClustering(boolean doUpdates) throws Exception { newCommitTime = "003"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateAndGetFilePaths(records, client, cfg, newCommitTime); + updateRecords(records, client, cfg, newCommitTime); } HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); @@ -320,7 +322,7 @@ public void testIncrementalReadsWithCompaction() throws Exception { client.startCommitWithTime(commitTime1); List records001 = dataGen.generateInserts(commitTime1, 200); - insertAndGetFilePaths(records001, client, cfg, commitTime1); + insertRecords(records001, client, cfg, commitTime1); // verify only one base file shows up with commit time 001 FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath); @@ -341,7 +343,7 @@ public void testIncrementalReadsWithCompaction() throws Exception { String updateTime = "004"; client.startCommitWithTime(updateTime); List records004 = dataGen.generateUpdates(updateTime, 100); - updateAndGetFilePaths(records004, client, cfg, updateTime); + updateRecords(records004, client, cfg, updateTime); // verify RO incremental reads - only one parquet file shows up because updates to into log files incrementalROFiles = getROIncrementalFiles(partitionPath, false); @@ -368,7 +370,7 @@ public void testIncrementalReadsWithCompaction() throws Exception { String insertsTime = "006"; List records006 = dataGen.generateInserts(insertsTime, 200); client.startCommitWithTime(insertsTime); - insertAndGetFilePaths(records006, client, cfg, insertsTime); + insertRecords(records006, client, cfg, insertsTime); // verify new write shows up in snapshot mode even though there is pending compaction snapshotROFiles = getROSnapshotFiles(partitionPath); @@ -1064,11 +1066,16 @@ public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { long numLogFiles = 0; for (String partitionPath : dataGen.getPartitionPaths()) { - assertEquals(0, tableRTFileSystemView.getLatestFileSlices(partitionPath) - .filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); - assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath) - .filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); + assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); + assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); + long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + if (logFileCount > 0) { + // check the log versions start from the base version + assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion()) + .allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION))); + } + numLogFiles += logFileCount; } assertTrue(numLogFiles > 0); @@ -1569,12 +1576,13 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean .withRollbackUsingMarkers(rollbackUsingMarkers); } - private FileStatus[] insertAndGetFilePaths(List records, SparkRDDWriteClient client, - HoodieWriteConfig cfg, String commitTime) throws IOException { + private void insertRecords(List records, SparkRDDWriteClient client, + HoodieWriteConfig cfg, String commitTime) throws IOException { JavaRDD writeRecords = jsc.parallelize(records, 1); List statuses = client.insert(writeRecords, commitTime).collect(); assertNoWriteErrors(statuses); + assertFileSizes(statuses); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); @@ -1596,11 +1604,10 @@ private FileStatus[] insertAndGetFilePaths(List records, SparkRDDW dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the parquet files we wrote in the delta commit"); - return allFiles; } - private FileStatus[] updateAndGetFilePaths(List records, SparkRDDWriteClient client, - HoodieWriteConfig cfg, String commitTime) throws IOException { + private void updateRecords(List records, SparkRDDWriteClient client, + HoodieWriteConfig cfg, String commitTime) throws IOException { Map recordsMap = new HashMap<>(); for (HoodieRecord rec : records) { if (!recordsMap.containsKey(rec.getKey())) { @@ -1611,6 +1618,8 @@ private FileStatus[] updateAndGetFilePaths(List records, SparkRDDW List statuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); + assertFileSizes(statuses); + metaClient = HoodieTableMetaClient.reload(metaClient); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); assertTrue(deltaCommit.isPresent()); @@ -1619,8 +1628,13 @@ private FileStatus[] updateAndGetFilePaths(List records, SparkRDDW Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - return listAllBaseFilesInPath(hoodieTable); + } + + private void assertFileSizes(List statuses) throws IOException { + for (WriteStatus status: statuses) { + assertEquals(FSUtils.getFileSize(metaClient.getFs(), new Path(metaClient.getBasePath(), status.getStat().getPath())), + status.getStat().getFileSizeInBytes()); + } } private FileStatus[] getROSnapshotFiles(String partitionPath) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java index c8b30e18b13e7..c97743f4d115e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java @@ -32,7 +32,7 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat { private int logVersion; private long logOffset; private String baseFile; - private List logFiles = new ArrayList(); + private List logFiles = new ArrayList<>(); public void setLogVersion(int logVersion) { this.logVersion = logVersion; @@ -62,7 +62,7 @@ public void setLogFiles(List logFiles) { this.logFiles = logFiles; } - public void appendLogFiles(String logFile) { + public void addLogFiles(String logFile) { logFiles.add(logFile); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 97288dfe00890..9a640bedee41d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -71,7 +71,7 @@ public class HoodieWriteStat implements Serializable { private long numInserts; /** - * Total size of file written. + * Total number of bytes written. */ private long totalWriteBytes; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AppendResult.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AppendResult.java new file mode 100644 index 0000000000000..8246edada9161 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AppendResult.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; + +/** + * Pojo holding information on the result of a {@link org.apache.hudi.common.table.log.HoodieLogFormat.Writer#appendBlock(HoodieLogBlock)}. + */ +public class AppendResult { + + private final HoodieLogFile logFile; + private final long offset; + private final long size; + + public AppendResult(HoodieLogFile logFile, long offset, long size) { + this.logFile = logFile; + this.offset = offset; + this.size = size; + } + + public HoodieLogFile logFile() { + return logFile; + } + + public long offset() { + return offset; + } + + public long size() { + return size; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 86f14956fe1c8..9b643ec6e16c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -33,6 +33,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Iterator; +import java.util.List; /** * File Format for Hoodie Log Files. The File Format consists of blocks each separated with a MAGIC sync marker. A Block @@ -61,14 +62,21 @@ public interface HoodieLogFormat { interface Writer extends Closeable { /** - * @return the path to this {@link HoodieLogFormat} + * @return the path to the current {@link HoodieLogFile} being written to. */ HoodieLogFile getLogFile(); /** - * Append Block returns a new Writer if the log is rolled. + * Append Block to a log file. + * @return {@link AppendResult} containing result of the append. + */ + AppendResult appendBlock(HoodieLogBlock block) throws IOException, InterruptedException; + + /** + * Appends the list of blocks to a logfile. + * @return {@link AppendResult} containing result of the append. */ - Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException; + AppendResult appendBlocks(List blocks) throws IOException, InterruptedException; long getCurrentSize() throws IOException; } @@ -88,7 +96,7 @@ interface Reader extends Closeable, Iterator { * * @return */ - public boolean hasPrev(); + boolean hasPrev(); /** * Read log file in reverse order and return prev block if present. @@ -96,7 +104,7 @@ interface Reader extends Closeable, Iterator { * @return * @throws IOException */ - public HoodieLogBlock prev() throws IOException; + HoodieLogBlock prev() throws IOException; } /** @@ -144,13 +152,13 @@ public WriterBuilder withReplication(short replication) { return this; } - public WriterBuilder withLogWriteToken(String writeToken) { - this.logWriteToken = writeToken; + public WriterBuilder withRolloverLogWriteToken(String rolloverLogWriteToken) { + this.rolloverLogWriteToken = rolloverLogWriteToken; return this; } - public WriterBuilder withRolloverLogWriteToken(String rolloverLogWriteToken) { - this.rolloverLogWriteToken = rolloverLogWriteToken; + public WriterBuilder withLogWriteToken(String logWriteToken) { + this.logWriteToken = logWriteToken; return this; } @@ -194,7 +202,7 @@ public WriterBuilder onParentPath(Path parentPath) { return this; } - public Writer build() throws IOException, InterruptedException { + public Writer build() throws IOException { LOG.info("Building HoodieLogFormat Writer"); if (fs == null) { throw new IllegalArgumentException("fs is not specified"); @@ -254,8 +262,7 @@ public Writer build() throws IOException, InterruptedException { if (sizeThreshold == null) { sizeThreshold = DEFAULT_SIZE_THRESHOLD; } - return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, logWriteToken, - rolloverLogWriteToken); + return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index 7fe21e9b2955d..d7e4f7ef108bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.exception.HoodieException; @@ -38,6 +37,8 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * HoodieLogFormatWriter can be used to append blocks to a log file Use HoodieLogFormat.WriterBuilder to construct. @@ -47,13 +48,13 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private static final Logger LOG = LogManager.getLogger(HoodieLogFormatWriter.class); private HoodieLogFile logFile; + private FSDataOutputStream output; + private final FileSystem fs; private final long sizeThreshold; private final Integer bufferSize; private final Short replication; - private final String logWriteToken; private final String rolloverLogWriteToken; - private FSDataOutputStream output; private boolean closed = false; private transient Thread shutdownThread = null; @@ -66,14 +67,12 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { * @param replication * @param sizeThreshold */ - HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, - String logWriteToken, String rolloverLogWriteToken) { + HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken) { this.fs = fs; this.logFile = logFile; this.sizeThreshold = sizeThreshold; this.bufferSize = bufferSize; this.replication = replication; - this.logWriteToken = logWriteToken; this.rolloverLogWriteToken = rolloverLogWriteToken; addShutDownHook(); } @@ -105,6 +104,7 @@ private FSDataOutputStream getOutputStream() throws IOException, InterruptedExce if (isAppendSupported) { LOG.info(logFile + " exists. Appending to existing file"); try { + // open the path for append and record the offset this.output = fs.append(path, bufferSize); } catch (RemoteException e) { LOG.warn("Remote Exception, attempting to handle or recover lease", e); @@ -124,9 +124,9 @@ private FSDataOutputStream getOutputStream() throws IOException, InterruptedExce } } if (!isAppendSupported) { - this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); - LOG.info("Append not supported.. Rolling over to " + logFile); + rollOver(); createNewFile(); + LOG.info("Append not supported.. Rolling over to " + logFile); } } else { LOG.info(logFile + " does not exist. Create a new file"); @@ -138,52 +138,66 @@ private FSDataOutputStream getOutputStream() throws IOException, InterruptedExce } @Override - public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException { + public AppendResult appendBlock(HoodieLogBlock block) throws IOException, InterruptedException { + return appendBlocks(Collections.singletonList(block)); + } + @Override + public AppendResult appendBlocks(List blocks) throws IOException, InterruptedException { // Find current version HoodieLogFormat.LogFormatVersion currentLogFormatVersion = new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION); FSDataOutputStream outputStream = getOutputStream(); - long currentSize = outputStream.size(); - - // 1. Write the magic header for the start of the block - outputStream.write(HoodieLogFormat.MAGIC); - - // bytes for header - byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader()); - // content bytes - byte[] content = block.getContentBytes(); - // bytes for footer - byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter()); - - // 2. Write the total size of the block (excluding Magic) - outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length)); - - // 3. Write the version of this log block - outputStream.writeInt(currentLogFormatVersion.getVersion()); - // 4. Write the block type - outputStream.writeInt(block.getBlockType().ordinal()); - - // 5. Write the headers for the log block - outputStream.write(headerBytes); - // 6. Write the size of the content block - outputStream.writeLong(content.length); - // 7. Write the contents of the data block - outputStream.write(content); - // 8. Write the footers for the log block - outputStream.write(footerBytes); - // 9. Write the total size of the log block (including magic) which is everything written - // until now (for reverse pointer) - // Update: this information is now used in determining if a block is corrupt by comparing to the - // block size in header. This change assumes that the block size will be the last data written - // to a block. Read will break if any data is written past this point for a block. - outputStream.writeLong(outputStream.size() - currentSize); - // Flush every block to disk + long startPos = outputStream.getPos(); + long sizeWritten = 0; + + for (HoodieLogBlock block: blocks) { + long startSize = outputStream.size(); + + // 1. Write the magic header for the start of the block + outputStream.write(HoodieLogFormat.MAGIC); + + // bytes for header + byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader()); + // content bytes + byte[] content = block.getContentBytes(); + // bytes for footer + byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter()); + + // 2. Write the total size of the block (excluding Magic) + outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length)); + + // 3. Write the version of this log block + outputStream.writeInt(currentLogFormatVersion.getVersion()); + // 4. Write the block type + outputStream.writeInt(block.getBlockType().ordinal()); + + // 5. Write the headers for the log block + outputStream.write(headerBytes); + // 6. Write the size of the content block + outputStream.writeLong(content.length); + // 7. Write the contents of the data block + outputStream.write(content); + // 8. Write the footers for the log block + outputStream.write(footerBytes); + // 9. Write the total size of the log block (including magic) which is everything written + // until now (for reverse pointer) + // Update: this information is now used in determining if a block is corrupt by comparing to the + // block size in header. This change assumes that the block size will be the last data written + // to a block. Read will break if any data is written past this point for a block. + outputStream.writeLong(outputStream.size() - startSize); + + // Fetch the size again, so it accounts also (9). + sizeWritten += outputStream.size() - startSize; + } + // Flush all blocks to disk flush(); + AppendResult result = new AppendResult(logFile, startPos, sizeWritten); // roll over if size is past the threshold - return rolloverIfNeeded(); + rolloverIfNeeded(); + return result; } /** @@ -201,20 +215,19 @@ private int getLogBlockLength(int contentLength, int headerLength, int footerLen Long.BYTES; // bytes to write totalLogBlockLength at end of block (for reverse ptr) } - private Writer rolloverIfNeeded() throws IOException, InterruptedException { + private void rolloverIfNeeded() throws IOException { // Roll over if the size is past the threshold if (getCurrentSize() > sizeThreshold) { - // TODO - make an end marker which seals the old log file (no more appends possible to that - // file). LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold + ". Rolling over to the next version"); - HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken); - // close this writer and return the new writer - close(); - return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken, - rolloverLogWriteToken); + rollOver(); } - return this; + } + + private void rollOver() throws IOException { + closeStream(); + this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); + this.closed = false; } private void createNewFile() throws IOException { @@ -292,13 +305,12 @@ private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) // appended to, then the NN will throw an exception saying that it couldn't find any active replica with the // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325 LOG.warn("Failed to open an append stream to the log file. Opening a new log file..", e); - // Rollover the current log file (since cannot get a stream handle) and create new one - this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); + rollOver(); createNewFile(); } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) { LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over"); // Rollover the current log file (since cannot get a stream handle) and create new one - this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); + rollOver(); createNewFile(); } else if (e.getClassName().contentEquals(RecoveryInProgressException.class.getName()) && (fs instanceof DistributedFileSystem)) { @@ -311,8 +323,9 @@ private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) // try again this.output = fs.append(path, bufferSize); } else { - LOG.warn("Failed to recover lease on path " + path); - throw new HoodieException(e); + final String msg = "Failed to recover lease on path " + path; + LOG.warn(msg); + throw new HoodieException(msg, e); } } else { // When fs.append() has failed and an exception is thrown, by closing the output stream @@ -320,16 +333,16 @@ private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) // new attemptId, say taskId.1) it will be able to acquire lease on the log file (as output stream was // closed properly by taskId.0). // - // If close() call were to fail throwing an exception, our best bet is to rollover to a new log file. + // If closeStream() call were to fail throwing an exception, our best bet is to rollover to a new log file. try { - close(); + closeStream(); // output stream has been successfully closed and lease on the log file has been released, // before throwing an exception for the append failure. throw new HoodieIOException("Failed to append to the output stream ", e); } catch (Exception ce) { LOG.warn("Failed to close the output stream for " + fs.getClass().getName() + " on path " + path + ". Rolling over to a new log file."); - this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); + rollOver(); createNewFile(); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 98ece7309ea7d..57e814c339090 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.table.log.HoodieLogFileReader; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; @@ -76,6 +77,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -119,7 +121,7 @@ public void tearDown() throws IOException { } @Test - public void testEmptyLog() throws IOException, InterruptedException { + public void testEmptyLog() throws IOException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); @@ -138,18 +140,21 @@ public void testBasicAppend(HoodieLogBlockType dataBlockType) throws IOException Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + long pos = writer.getCurrentSize(); HoodieDataBlock dataBlock = getDataBlock(dataBlockType, records, header); - writer = writer.appendBlock(dataBlock); + AppendResult result = writer.appendBlock(dataBlock); + long size = writer.getCurrentSize(); assertTrue(size > 0, "We just wrote a block - size should be > 0"); assertEquals(size, fs.getFileStatus(writer.getLogFile().getPath()).getLen(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match"); + assertEquals(size, result.size()); + assertEquals(writer.getLogFile(), result.logFile()); + assertEquals(0, result.offset()); writer.close(); - } - @ParameterizedTest - @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" }) + @Test public void testRollover() throws IOException, InterruptedException, URISyntaxException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -160,23 +165,36 @@ public void testRollover() throws IOException, InterruptedException, URISyntaxEx header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); // Write out a block - writer = writer.appendBlock(dataBlock); + AppendResult firstAppend = writer.appendBlock(dataBlock); // Get the size of the block long size = writer.getCurrentSize(); writer.close(); + assertEquals(0, firstAppend.offset()); + assertEquals(size, firstAppend.size()); + // Create a writer with the size threshold as the size we just wrote - so this has to roll writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build(); records = SchemaTestUtil.generateTestRecords(0, 100); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + AppendResult secondAppend = writer.appendBlock(dataBlock); + + assertEquals(firstAppend.logFile(), secondAppend.logFile()); + assertNotEquals(0, secondAppend.offset()); assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0"); assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2"); Path logFilePath = writer.getLogFile().getPath(); assertFalse(fs.exists(logFilePath), "Path (" + logFilePath + ") must not exist"); + + // Write one more block, which should not go to the new log file. + records = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = getDataBlock(records, header); + AppendResult rolloverAppend = writer.appendBlock(dataBlock); + + assertNotEquals(secondAppend.logFile(), rolloverAppend.logFile()); + assertEquals(0, rolloverAppend.offset()); writer.close(); } @@ -203,17 +221,13 @@ private void testConcurrentAppend(boolean logFileExists, boolean newLogFileForma if (newLogFileFormat && logFileExists) { // Assume there is an existing log-file with write token - builder1 = builder1.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN) - .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); - builder2 = builder2.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN) - .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); + builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); + builder2 = builder2.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); } else if (newLogFileFormat) { // First log file of the file-slice builder1 = builder1.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) - .withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN) .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); builder2 = builder2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) - .withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN) .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); } else { builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); @@ -224,9 +238,9 @@ private void testConcurrentAppend(boolean logFileExists, boolean newLogFileForma header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); Writer writer2 = builder2.build(); - writer2 = writer2.appendBlock(dataBlock); + writer2.appendBlock(dataBlock); HoodieLogFile logFile1 = writer.getLogFile(); HoodieLogFile logFile2 = writer2.getLogFile(); writer.close(); @@ -245,7 +259,7 @@ public void testMultipleAppend() throws IOException, URISyntaxException, Interru header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); long size1 = writer.getCurrentSize(); writer.close(); @@ -255,7 +269,7 @@ public void testMultipleAppend() throws IOException, URISyntaxException, Interru records = SchemaTestUtil.generateTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); long size2 = writer.getCurrentSize(); assertTrue(size2 > size1, "We just wrote a new block - size2 should be > size1"); assertEquals(size2, fs.getFileStatus(writer.getLogFile().getPath()).getLen(), @@ -269,7 +283,7 @@ public void testMultipleAppend() throws IOException, URISyntaxException, Interru records = SchemaTestUtil.generateTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); long size3 = writer.getCurrentSize(); assertTrue(size3 > size2, "We just wrote a new block - size3 should be > size2"); assertEquals(size3, fs.getFileStatus(writer.getLogFile().getPath()).getLen(), @@ -325,9 +339,11 @@ public void testAppendNotSupported() throws IOException, URISyntaxException, Int HoodieDataBlock dataBlock = getDataBlock(records, header); for (int i = 0; i < 2; i++) { - HoodieLogFormat.newWriterBuilder().onParentPath(testPath) + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive").overBaseCommit("") - .withFs(localFs).build().appendBlock(dataBlock).close(); + .withFs(localFs).build(); + writer.appendBlock(dataBlock); + writer.close(); } // ensure there are two log file versions, with same data. @@ -335,8 +351,7 @@ public void testAppendNotSupported() throws IOException, URISyntaxException, Int assertEquals(2, statuses.length); } - @ParameterizedTest - @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" }) + @Test public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -349,7 +364,7 @@ public void testBasicWriteAndScan() throws IOException, URISyntaxException, Inte header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); @@ -364,8 +379,7 @@ public void testBasicWriteAndScan() throws IOException, URISyntaxException, Inte reader.close(); } - @ParameterizedTest - @EnumSource(names = { "AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK" }) + @Test public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -378,7 +392,7 @@ public void testBasicAppendAndRead() throws IOException, URISyntaxException, Int header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); writer = @@ -389,7 +403,7 @@ public void testBasicAppendAndRead() throws IOException, URISyntaxException, Int .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Close and Open again and append 100 more records @@ -401,7 +415,7 @@ public void testBasicAppendAndRead() throws IOException, URISyntaxException, Int .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records3, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); @@ -455,7 +469,7 @@ public void testBasicAppendAndScanMultipleFiles(boolean readBlocksLazily) allRecords.add(copyOfRecords1); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); } writer.close(); @@ -495,7 +509,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) @@ -521,7 +535,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep records = SchemaTestUtil.generateTestRecords(0, 10); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // First round of reads - we should be able to read the first block and then EOF @@ -559,7 +573,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep records = SchemaTestUtil.generateTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Second round of reads - we should be able to read the first and last block @@ -597,7 +611,7 @@ public void testAvroLogRecordReaderBasic(boolean readBlocksLazily) header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -605,7 +619,7 @@ public void testAvroLogRecordReaderBasic(boolean readBlocksLazily) .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); List allLogFiles = @@ -654,21 +668,21 @@ public void testAvroLogRecordReaderWithRollbackTombstone(boolean readBlocksLazil header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Write 2 header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Rollback the last write header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); - writer = writer.appendBlock(commandBlock); + writer.appendBlock(commandBlock); // Write 3 header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); @@ -677,7 +691,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(boolean readBlocksLazil .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); dataBlock = getDataBlock(records3, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); List allLogFiles = @@ -724,7 +738,7 @@ public void testAvroLogRecordReaderWithRollbackPartialBlock() header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Write 2 @@ -756,7 +770,7 @@ public void testAvroLogRecordReaderWithRollbackPartialBlock() writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - writer = writer.appendBlock(commandBlock); + writer.appendBlock(commandBlock); // Write 3 header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103"); @@ -766,7 +780,7 @@ public void testAvroLogRecordReaderWithRollbackPartialBlock() header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); dataBlock = getDataBlock(records3, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); List allLogFiles = @@ -814,7 +828,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazil header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Write 2 header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); @@ -822,7 +836,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazil List copyOfRecords2 = records2.stream() .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); copyOfRecords1.addAll(copyOfRecords2); List originalKeys = @@ -837,7 +851,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazil header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); - writer = writer.appendBlock(deleteBlock); + writer.appendBlock(deleteBlock); List allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") @@ -922,13 +936,13 @@ public void testAvroLogRecordReaderWithFailedRollbacks(boolean readBlocksLazily) header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Delete 50 keys // Delete 50 keys @@ -938,14 +952,14 @@ public void testAvroLogRecordReaderWithFailedRollbacks(boolean readBlocksLazily) .collect(Collectors.toList()).subList(0, 50); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); - writer = writer.appendBlock(deleteBlock); + writer.appendBlock(deleteBlock); // Attempt 1 : Write rollback block for a failed write header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); try { - writer = writer.appendBlock(commandBlock); + writer.appendBlock(commandBlock); // Say job failed, retry writing 2 rollback in the next rollback(..) attempt throw new Exception("simulating failure"); } catch (Exception e) { @@ -999,7 +1013,7 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(boolean readBlock header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Delete 50 keys List deletedKeys = copyOfRecords1.stream() @@ -1007,13 +1021,13 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(boolean readBlock ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); - writer = writer.appendBlock(deleteBlock); + writer.appendBlock(deleteBlock); // Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); - writer = writer.appendBlock(commandBlock); + writer.appendBlock(commandBlock); writer.appendBlock(commandBlock); List allLogFiles = @@ -1051,7 +1065,7 @@ public void testAvroLogRecordReaderWithInvalidRollback(boolean readBlocksLazily) header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Write invalid rollback for a failed write (possible for in-flight commits) header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101"); @@ -1103,9 +1117,9 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(boolean readBloc header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); - writer = writer.appendBlock(dataBlock); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Delete 50 keys // Delete 50 keys @@ -1114,7 +1128,7 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(boolean readBloc ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); - writer = writer.appendBlock(deleteBlock); + writer.appendBlock(deleteBlock); // Write 1 rollback block for a failed write header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); @@ -1160,9 +1174,9 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean r header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); - writer = writer.appendBlock(dataBlock); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Append some arbit byte[] to the end of the log (mimics a partially written commit) @@ -1195,7 +1209,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean r HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Append some arbit byte[] to the end of the log (mimics a partially written commit) @@ -1220,7 +1234,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean r header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); - writer = writer.appendBlock(commandBlock); + writer.appendBlock(commandBlock); writer.close(); List allLogFiles = @@ -1272,7 +1286,7 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1 header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records.subList(0, numRecordsInLog1), header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // Get the size of the block long size = writer.getCurrentSize(); writer.close(); @@ -1286,7 +1300,7 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1 header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock2 = getDataBlock(records2.subList(0, numRecordsInLog2), header2); - writer2 = writer2.appendBlock(dataBlock2); + writer2.appendBlock(dataBlock2); // Get the size of the block writer2.close(); @@ -1360,7 +1374,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); writer = @@ -1370,7 +1384,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) List copyOfRecords2 = records2.stream() .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Close and Open again and append 100 more records @@ -1381,7 +1395,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) List copyOfRecords3 = records3.stream() .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); dataBlock = getDataBlock(records3, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), @@ -1429,7 +1443,7 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) @@ -1455,7 +1469,7 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = getDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // First round of reads - we should be able to read the first block and then EOF @@ -1488,7 +1502,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); HoodieDataBlock dataBlock = getDataBlock(records1, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); writer = @@ -1496,7 +1510,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); List records2 = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = getDataBlock(records2, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); // Close and Open again and append 100 more records @@ -1505,7 +1519,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); List records3 = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = getDataBlock(records3, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); writer.close(); HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java index 71616f6683eda..e313bb4a6ca0f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java @@ -110,7 +110,7 @@ public void testFailedToGetAppendStreamFromHDFSNameNode() .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive") .overBaseCommit("").withFs(fs).build(); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); // get the current log file version to compare later int logFileVersion = writer.getLogFile().getLogVersion(); Path logFilePath = writer.getLogFile().getPath(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java index c9586fe7560c3..bcd27f37591e3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java @@ -38,8 +38,8 @@ public void testBaseFileAndLogFiles() { String logFile2 = ".log2.log"; writeStat.setBaseFile(baseFile); - writeStat.appendLogFiles(logFile1); - writeStat.appendLogFiles(logFile2); + writeStat.addLogFiles(logFile1); + writeStat.addLogFiles(logFile2); assertTrue(writeStat.getLogFiles().contains(logFile1)); assertTrue(writeStat.getLogFiles().contains(logFile2)); assertEquals(baseFile, writeStat.getBaseFile()); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index fe0be469d28fe..b10f38763d6fa 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -216,7 +216,7 @@ public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem String newCommit, String rolledBackInstant, int logVersion) throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())).withFileId(fileId) - .overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withLogWriteToken("1-0-1") + .overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withRolloverLogWriteToken("1-0-1") .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata Map header = new HashMap<>(); @@ -225,7 +225,7 @@ public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); + writer.appendBlock(new HoodieCommandBlock(header)); return writer; } @@ -235,7 +235,7 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion) - .withLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build(); + .withRolloverLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build(); List records = new ArrayList<>(); for (int i = offset; i < offset + numberOfRecords; i++) { records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); @@ -245,7 +245,7 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); - writer = writer.appendBlock(dataBlock); + writer.appendBlock(dataBlock); return writer; } @@ -264,7 +264,7 @@ public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionD header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header); - writer = writer.appendBlock(rollbackBlock); + writer.appendBlock(rollbackBlock); return writer; }