Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
package org.apache.hudi.table.action.commit;

public enum BucketType {
UPDATE, INSERT
UPDATE, INSERT, APPEND_ONLY
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
Expand Down Expand Up @@ -408,6 +409,12 @@ public void cleanHandlesGracefully() {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
// append only mode always use FlinkCreateHandle
if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
}

if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
Expand All @@ -424,7 +431,8 @@ public void cleanHandlesGracefully() {
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
} else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
// use the same handle for insert bucket
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.BucketType;

import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected boolean needsUpdateLocation() {
@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
&& hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -70,6 +71,17 @@ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTab
}
}

@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// In some rare cases, the task was pulled up again with same write file name,
// for e.g, reuse the small log files from last commit instant.

// Just skip the marker file creation if it already exists, the new data would append to
// the file directly.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}

/**
* The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
* (thus the fs view got the written data files some of which may be invalid),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inpu
final HoodieRecord<?> record = inputRecords.get(0);
final String partitionPath = record.getPartitionPath();
final String fileId = record.getCurrentLocation().getFileId();
final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
? BucketType.INSERT
: BucketType.UPDATE;
final BucketType bucketType = BucketType.valueOf(record.getCurrentLocation().getInstantTime());
handleUpsertPartition(
instantTime,
partitionPath,
Expand Down Expand Up @@ -185,6 +183,7 @@ protected Iterator<List<WriteStatus>> handleUpsertPartition(
} else {
switch (bucketType) {
case INSERT:
case APPEND_ONLY:
return handleInsert(fileIdHint, recordItr);
case UPDATE:
return handleUpdate(partitionPath, fileIdHint, recordItr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ private FlinkOptions() {
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");

public static final ConfigOption<Boolean> APPEND_ONLY_ENABLE = ConfigOptions
.key("append_only.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether to write data to new baseFile without index, only support in COW, default false");

public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
Expand All @@ -55,9 +57,9 @@
* it then assigns the bucket with ID using the {@link BucketAssigner}.
*
* <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
* INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
* INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant time. There is no need to keep
* the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
* where the record should write to. The "I" and "U" tags are only used for downstream to decide whether
* where the record should write to. The "INSERT" and "UPDATE" tags are only used for downstream to decide whether
* the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer
* supports specifying the bucket type explicitly.
*
Expand Down Expand Up @@ -106,11 +108,18 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
*/
private final boolean globalIndex;

private final boolean appendOnly;

public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE);
if (appendOnly) {
ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()),
"APPEND_ONLY mode only support in COPY_ON_WRITE table");
}
}

@Override
Expand Down Expand Up @@ -170,25 +179,33 @@ private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exce
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;

if (appendOnly) {
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
record.setCurrentLocation(location);
out.collect((O) record);
return;
}

// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// Set up the instant time as "UPDATE" to mark the bucket as an update bucket.
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name()));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
updateIndexState(partitionPath, location);
} else {
location = oldLoc.toLocal("U");
location = oldLoc.toLocal(BucketType.UPDATE.name());
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
Expand All @@ -203,17 +220,26 @@ private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exce
}

private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
BucketInfo bucketInfo;
if (appendOnly) {
bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath);
} else {
bucketInfo = this.bucketAssigner.addInsert(partitionPath);
}

final HoodieRecordLocation location;
switch (bucketInfo.getBucketType()) {
case INSERT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any code path that we may get APPEND_ONLY value from the bucketInfo.getBucketType()?

// This is an insert bucket, use HoodieRecordLocation instant time as "I".
// This is an insert bucket, use HoodieRecordLocation instant time as "INSERT".
// Downstream operators can then check the instant time to know whether
// a record belongs to an insert bucket.
location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
location = new HoodieRecordLocation(BucketType.INSERT.name(), bucketInfo.getFileIdPrefix());
break;
case UPDATE:
location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
location = new HoodieRecordLocation(BucketType.UPDATE.name(), bucketInfo.getFileIdPrefix());
break;
case APPEND_ONLY:
location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), bucketInfo.getFileIdPrefix());
break;
default:
throw new AssertionError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ public BucketInfo addInsert(String partitionPath) {
}

// if we have anything more, create new insert buckets, like normal
return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT);
}

public BucketInfo addAppendOnly(String partitionPath) {
return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY);
}

private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType bucketType) {
if (newFileAssignStates.containsKey(partitionPath)) {
NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
if (newFileAssignState.canAssign()) {
Expand All @@ -148,7 +156,7 @@ public BucketInfo addInsert(String partitionPath) {
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
return bucketInfoMap.get(key);
}
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
BucketInfo bucketInfo = new BucketInfo(bucketType, FSUtils.createNewFileIdPfx(), partitionPath);
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
bucketInfoMap.put(key, bucketInfo);
newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType;

@Parameter(names = {"--append-only"}, description = "Write data to new parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = true)
public Boolean appendOnly = false;

@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
Expand Down Expand Up @@ -290,7 +293,13 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly);
if (config.appendOnly) {
// append only should use insert operation
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
} else {
conf.setString(FlinkOptions.OPERATION, config.operation.value());
}
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
Expand Down Expand Up @@ -144,6 +145,11 @@ private static void setupConfOptions(
TableSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
// append only
if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) {
// append only should use insert operation
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
}
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// cleaning options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
Expand All @@ -44,6 +45,7 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -509,6 +511,83 @@ public void testInsertWithDeduplication() throws Exception {
checkWrittenData(tempFile, expected, 1);
}

@Test
public void testAppendOnly() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false);
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);

// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}

// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));

final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));

funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");

String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());

funcWrapper.checkpointComplete(1);

Map<String, List<String>> expected = new HashMap<>();

expected.put("par1", Arrays.asList(
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,4,par1"));

TestData.checkWrittenAllData(tempFile, expected, 1);

// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);

// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}

funcWrapper.checkpointFunction(2);

final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.checkpointComplete(2);

// Same the original base file content.
expected.put("par1", Arrays.asList(
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,4,par1",
"id1,par1,id1,Danny,23,4,par1"));
TestData.checkWrittenAllData(tempFile, expected, 1);
}

@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
Expand Down
Loading