Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void init() throws IOException {
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();

// archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
archiveLog.archiveIfRequired(hadoopConf);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
archiveLog.archiveIfRequired();
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(metadata, instantTime, extraMetadata);
postCommit(table, metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, actionType);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
Expand All @@ -147,11 +147,13 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String

/**
* Post Commit Hook. Derived classes use this method to perform post-commit processing
*
* @param table table to commit on
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);
protected abstract void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);

/**
* Finalize Write operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instan
result.getWriteStats().get().size());
}

postCommit(result.getCommitMetadata().get(), instantTime, Option.empty());
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());

emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
Expand All @@ -325,8 +325,7 @@ private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instan
}

@Override
protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
Option<Map<String, String>> extraMetadata) {
protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
Expand All @@ -336,8 +335,8 @@ protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
archiveLog.archiveIfRequired(hadoopConf);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
archiveLog.archiveIfRequired();
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
LOG.info("Auto cleaning is enabled. Running cleaner now");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
public class HoodieWriteConfig extends DefaultHoodieConfig {

public static final String TABLE_NAME = "hoodie.table.name";
public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false";
public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers";
public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
public static final String BASE_PATH_PROP = "hoodie.base.path";
public static final String AVRO_SCHEMA = "hoodie.avro.schema";
Expand Down Expand Up @@ -197,6 +199,10 @@ public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}

public boolean shouldRollbackUsingMarkers() {
return Boolean.parseBoolean(props.getProperty(ROLLBACK_USING_MARKERS));
}

public int getWriteBufferLimitBytes() {
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
}
Expand Down Expand Up @@ -706,6 +712,11 @@ public Builder withRollbackParallelism(int rollbackParallelism) {
return this;
}

public Builder withRollbackUsingMarkers(boolean rollbackUsingMarkers) {
props.setProperty(ROLLBACK_USING_MARKERS, String.valueOf(rollbackUsingMarkers));
return this;
}

public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
return this;
Expand Down Expand Up @@ -803,6 +814,8 @@ public HoodieWriteConfig build() {
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
DEFAULT_ROLLBACK_USING_MARKERS);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
DEFAULT_COMBINE_BEFORE_INSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.util.SizeEstimator;
Expand Down Expand Up @@ -119,10 +120,11 @@ private void init(HoodieRecord record) {
SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
// Set the base commit time as the current instantTime for new inserts into log files
String baseInstantTime = instantTime;
String baseInstantTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
} else {
baseInstantTime = instantTime;
// This means there is no base data file, start appending to a new log file
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
LOG.info("New InsertHandle for partition :" + partitionPath);
Expand All @@ -138,6 +140,12 @@ private void init(HoodieRecord record) {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());

// Since the actual log file written to can be different based on when rollover happens, we use the
// base file to denote some log appends happened on a slice. writeToken will still fence concurrent
// writers.
createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId));

this.writer = createLogWriter(fileSlice, baseInstantTime);
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
Expand Down Expand Up @@ -278,6 +286,11 @@ public WriteStatus getWriteStatus() {
return writeStatus;
}

@Override
public MarkerFiles.MarkerType getIOType() {
return MarkerFiles.MarkerType.APPEND;
}

private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException, InterruptedException {
Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
Expand All @@ -288,7 +301,8 @@ private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTim
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withRolloverLogWriteToken(writeToken).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
.withRolloverLogWriteToken(writeToken)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

private void writeToBuffer(HoodieRecord<T> record) {
Expand Down Expand Up @@ -327,5 +341,4 @@ private void flushToDiskIfRequired(HoodieRecord record) {
numberOfRecords = 0;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -67,7 +68,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath);
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId));
this.storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
} catch (IOException e) {
Expand Down Expand Up @@ -147,6 +148,11 @@ public WriteStatus getWriteStatus() {
return writeStatus;
}

@Override
public MarkerFiles.MarkerType getIOType() {
return MarkerFiles.MarkerType.CREATE;
}

/**
* Performs actions to durably, persist the current changes and returns a WriteStatus object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
Expand All @@ -45,6 +44,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -89,10 +89,6 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
init(fileId, this.partitionPath, dataFileToBeMerged);
}

public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}

@Override
public Schema getWriterSchema() {
return writerSchema;
Expand All @@ -114,8 +110,9 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo
partitionMetadata.trySave(getPartitionId());

oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString();
+ newFileName).toString();
newFilePath = new Path(config.getBasePath(), relativePath);

LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
Expand All @@ -128,7 +125,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo
writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);

// Create Marker file
createMarkerFile(partitionPath);
createMarkerFile(partitionPath, newFileName);

// Create the writer for writing the new version file
storageWriter =
Expand Down Expand Up @@ -312,4 +309,9 @@ public Path getOldFilePath() {
public WriteStatus getWriteStatus() {
return writeStatus;
}

@Override
public MarkerFiles.MarkerType getIOType() {
return MarkerFiles.MarkerType.MERGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -37,6 +36,7 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -48,6 +48,7 @@
public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends HoodieIOHandle {

private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);

protected final Schema originalSchema;
protected final Schema writerSchema;
protected HoodieTimer timer;
Expand Down Expand Up @@ -94,28 +95,9 @@ public Path makeNewPath(String partitionPath) {
*
* @param partitionPath Partition path
*/
protected void createMarkerFile(String partitionPath) {
Path markerPath = makeNewMarkerPath(partitionPath);
try {
LOG.info("Creating Marker Path=" + markerPath);
fs.create(markerPath, false).close();
} catch (IOException e) {
throw new HoodieException("Failed to create marker file " + markerPath, e);
}
}

/**
* THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
*/
private Path makeNewMarkerPath(String partitionPath) {
Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId));
protected void createMarkerFile(String partitionPath, String dataFileName) {
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createMarkerFile(partitionPath, dataFileName, getIOType());
}

public Schema getWriterSchema() {
Expand Down Expand Up @@ -164,6 +146,8 @@ protected GenericRecord rewriteRecord(GenericRecord record) {

public abstract WriteStatus getWriteStatus();

public abstract MarkerFiles.MarkerType getIOType();

@Override
protected FileSystem getFileSystem() {
return hoodieTable.getMetaClient().getFs();
Expand Down
Loading