diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java index 6fdb217a0dcaf..c3371bab092db 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java @@ -44,7 +44,13 @@ public long addInserts(long numInserts) { } public long addUpdates(HoodieRecordLocation location, long numUpdates) { - updateLocationToCount.put(location.getFileId(), Pair.of(location.getInstantTime(), numUpdates)); + long accNumUpdates = 0; + if (updateLocationToCount.containsKey(location.getFileId())) { + accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight(); + } + updateLocationToCount.put( + location.getFileId(), + Pair.of(location.getInstantTime(), numUpdates + accNumUpdates)); return this.numUpdates += numUpdates; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java index 1d98ad49e77fb..6547da6425460 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import java.io.Serializable; +import java.util.Objects; /** * Helper class for a bucket's type (INSERT and UPDATE) and its file location. @@ -29,6 +30,24 @@ public class BucketInfo implements Serializable { String fileIdPrefix; String partitionPath; + public BucketInfo(BucketType bucketType, String fileIdPrefix, String partitionPath) { + this.bucketType = bucketType; + this.fileIdPrefix = fileIdPrefix; + this.partitionPath = partitionPath; + } + + public BucketType getBucketType() { + return bucketType; + } + + public String getFileIdPrefix() { + return fileIdPrefix; + } + + public String getPartitionPath() { + return partitionPath; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("BucketInfo {"); @@ -38,4 +57,23 @@ public String toString() { sb.append('}'); return sb.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BucketInfo that = (BucketInfo) o; + return bucketType == that.bucketType + && fileIdPrefix.equals(that.fileIdPrefix) + && partitionPath.equals(that.partitionPath); + } + + @Override + public int hashCode() { + return Objects.hash(bucketType, fileIdPrefix, partitionPath); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index e3e0eb4218973..0c87f7df9f308 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; @@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) { public void deletePendingInstant(String tableType, String instant) { HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); - table.getMetaClient().getActiveTimeline() - .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant)); + HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); + activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant); + activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant); + } + + public void transitionRequestedToInflight(String tableType, String inFlightInstant) { + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); + activeTimeline.transitionRequestedToInflight(requested, Option.empty(), + config.shouldAllowMultiWriteOnSameInstant()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java index 44eafd57f75a8..bae8de2391ed4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.FlinkHoodieIndex; @@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf public List> tagLocation(List> records, HoodieEngineContext context, HoodieTable>, List, List> hoodieTable) throws HoodieIndexException { - return context.map(records, record -> { - try { - if (mapState.contains(record.getKey())) { - record.unseal(); - record.setCurrentLocation(mapState.get(record.getKey())); - record.seal(); - } - } catch (Exception e) { - LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage())); - } - return record; - }, 0); + throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex"); } @Override public List updateLocation(List writeStatuses, HoodieEngineContext context, HoodieTable>, List, List> hoodieTable) throws HoodieIndexException { - return context.map(writeStatuses, writeStatus -> { - for (HoodieRecord record : writeStatus.getWrittenRecords()) { - if (!writeStatus.isErrored(record.getKey())) { - HoodieKey key = record.getKey(); - Option newLocation = record.getNewLocation(); - if (newLocation.isPresent()) { - try { - mapState.put(key, newLocation.get()); - } catch (Exception e) { - LOG.error(String.format("Update record location failed, key = %s, %s", record.getRecordKey(), e.getMessage())); - } - } else { - // Delete existing index for a deleted record - try { - mapState.remove(key); - } catch (Exception e) { - LOG.error(String.format("Remove record location failed, key = %s, %s", record.getRecordKey(), e.getMessage())); - } - } - } - } - return writeStatus; - }, 0); + throw new UnsupportedOperationException("No need to update location for FlinkInMemoryStateIndex"); } @Override @@ -128,6 +94,6 @@ public boolean canIndexLogFiles() { */ @Override public boolean isImplicitWithStorage() { - return false; + return true; } } \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java new file mode 100644 index 0000000000000..d65663e639e3d --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +/** + * Create handle factory for Flink writer, use the specified fileID directly + * because it is unique anyway. + */ +public class FlinkCreateHandleFactory + extends CreateHandleFactory { + + @Override + public HoodieWriteHandle create( + HoodieWriteConfig hoodieConfig, String commitTime, + HoodieTable hoodieTable, String partitionPath, + String fileIdPrefix, TaskContextSupplier taskContextSupplier) { + return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, + fileIdPrefix, taskContextSupplier); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 337e7cb269e3f..044f841d27615 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -20,51 +20,52 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; -import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.FlinkCreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.WorkloadProfile; -import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.time.Instant; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import scala.Tuple2; - +/** + * With {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record + * is tagged with a bucket ID (partition path + fileID) in streaming way. All the records consumed by this + * executor should be tagged with bucket IDs and belong to one data bucket. + * + *

These bucket IDs make it possible to shuffle the records first by the bucket ID + * (see org.apache.hudi.operator.partitioner.BucketAssignerFunction), and this executor + * only needs to handle the data buffer that belongs to one data bucket once at a time. So there is no need to + * partition the buffer. + * + *

Computing the records batch locations all at a time is a pressure to the engine, + * we should avoid that in streaming system. + */ public abstract class BaseFlinkCommitActionExecutor extends BaseCommitActionExecutor>, List, List, HoodieWriteMetadata> { @@ -91,47 +92,39 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context, public HoodieWriteMetadata> execute(List> inputRecords) { HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; - if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); - try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); - } - } - } - - final Partitioner partitioner = getPartitioner(profile); - Map>> partitionedRecords = partition(inputRecords, partitioner); - List writeStatuses = new LinkedList<>(); - partitionedRecords.forEach((partition, records) -> { - if (WriteOperationType.isChangingRecords(operationType)) { - handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } else { - handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } - }); - updateIndex(writeStatuses, result); + final HoodieRecord record = inputRecords.get(0); + final String partitionPath = record.getPartitionPath(); + final String fileId = record.getCurrentLocation().getFileId(); + final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") + ? BucketType.INSERT + : BucketType.UPDATE; + if (WriteOperationType.isChangingRecords(operationType)) { + handleUpsertPartition( + instantTime, + partitionPath, + fileId, bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } else { + handleUpsertPartition( + instantTime, + partitionPath, + fileId, + bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } + setUpWriteMetadata(writeStatuses, result); return result; } - protected void updateIndex(List writeStatuses, HoodieWriteMetadata> result) { - Instant indexStartTime = Instant.now(); - // Update the index back - List statuses = table.getIndex().updateLocation(writeStatuses, context, table); - result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); + protected void setUpWriteMetadata( + List statuses, + HoodieWriteMetadata> result) { + // No need to update the index because the update happens before the write. result.setWriteStatuses(statuses); + result.setIndexUpdateDuration(Duration.ZERO); } @Override @@ -139,56 +132,6 @@ protected String getCommitActionType() { return table.getMetaClient().getCommitActionType(); } - private Partitioner getPartitioner(WorkloadProfile profile) { - if (WriteOperationType.isChangingRecords(operationType)) { - return getUpsertPartitioner(profile); - } else { - return getInsertPartitioner(profile); - } - } - - private Map>> partition(List> dedupedRecords, Partitioner partitioner) { - Map>, HoodieRecord>>> partitionedMidRecords = dedupedRecords - .stream() - .map(record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)) - .collect(Collectors.groupingBy(x -> partitioner.getPartition(x._1))); - Map>> results = new LinkedHashMap<>(); - partitionedMidRecords.forEach((key, value) -> results.put(key, value.stream().map(x -> x._2).collect(Collectors.toList()))); - return results; - } - - protected Pair, WorkloadStat> buildProfile(List> inputRecords) { - HashMap partitionPathStatMap = new HashMap<>(); - WorkloadStat globalStat = new WorkloadStat(); - - Map>, Long> partitionLocationCounts = inputRecords - .stream() - .map(record -> Pair.of( - Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)) - .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); - - for (Map.Entry>, Long> e : partitionLocationCounts.entrySet()) { - String partitionPath = e.getKey().getLeft(); - Long count = e.getValue(); - Option locOption = e.getKey().getRight(); - - if (!partitionPathStatMap.containsKey(partitionPath)) { - partitionPathStatMap.put(partitionPath, new WorkloadStat()); - } - - if (locOption.isPresent()) { - // update - partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count); - globalStat.addUpdates(locOption.get(), count); - } else { - // insert - partitionPathStatMap.get(partitionPath).addInserts(count); - globalStat.addInserts(count); - } - } - return Pair.of(partitionPathStatMap, globalStat); - } - @Override protected void commit(Option> extraMetadata, HoodieWriteMetadata> result) { commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); @@ -228,31 +171,28 @@ protected boolean isWorkloadProfileNeeded() { } @SuppressWarnings("unchecked") - protected Iterator> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, - Partitioner partitioner) { - UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; - BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); - BucketType btype = binfo.bucketType; + protected Iterator> handleUpsertPartition( + String instantTime, + String partitionPath, + String fileIdHint, + BucketType bucketType, + Iterator recordItr) { try { - if (btype.equals(BucketType.INSERT)) { - return handleInsert(binfo.fileIdPrefix, recordItr); - } else if (btype.equals(BucketType.UPDATE)) { - return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr); - } else { - throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); + switch (bucketType) { + case INSERT: + return handleInsert(fileIdHint, recordItr); + case UPDATE: + return handleUpdate(partitionPath, fileIdHint, recordItr); + default: + throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath); } } catch (Throwable t) { - String msg = "Error upserting bucketType " + btype + " for partition :" + partition; + String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath; LOG.error(msg, t); throw new HoodieUpsertException(msg, t); } } - protected Iterator> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, - Partitioner partitioner) { - return handleUpsertPartition(instantTime, partition, recordItr, partitioner); - } - @Override public Iterator> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) @@ -293,13 +233,6 @@ protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, } } - protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, - Map> keyToNewRecords, - HoodieBaseFile dataFileToBeMerged) { - return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords, - partitionPath, fileId, dataFileToBeMerged, taskContextSupplier); - } - @Override public Iterator> handleInsert(String idPfx, Iterator> recordItr) throws Exception { @@ -309,24 +242,6 @@ public Iterator> handleInsert(String idPfx, Iterator) Collections.EMPTY_LIST).iterator(); } return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx, - taskContextSupplier, new CreateHandleFactory<>()); + taskContextSupplier, new FlinkCreateHandleFactory<>()); } - - /** - * Provides a partitioner to perform the upsert operation, based on the workload profile. - */ - public Partitioner getUpsertPartitioner(WorkloadProfile profile) { - if (profile == null) { - throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); - } - return new UpsertPartitioner(profile, context, table, config); - } - - /** - * Provides a partitioner to perform the insert operation, based on the workload profile. - */ - public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return getUpsertPartitioner(profile); - } - } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 191071e017a80..52381230d075f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -19,17 +19,32 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +/** + * Overrides the {@link #write} method to not look up index and partition the records, because + * with {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record + * is tagged with a bucket ID (partition path + fileID) in streaming way. The FlinkWriteHelper only hands over + * the records to the action executor {@link BaseCommitActionExecutor} to execute. + * + *

Computing the records batch locations all at a time is a pressure to the engine, + * we should avoid that in streaming system. + */ public class FlinkWriteHelper extends AbstractWriteHelper>, List, List, R> { @@ -44,23 +59,46 @@ public static FlinkWriteHelper newInstance() { return WriteHelperHolder.FLINK_WRITE_HELPER; } + @Override + public HoodieWriteMetadata> write(String instantTime, List> inputRecords, HoodieEngineContext context, + HoodieTable>, List, List> table, boolean shouldCombine, int shuffleParallelism, + BaseCommitActionExecutor>, List, List, R> executor, boolean performTagging) { + try { + Instant lookupBegin = Instant.now(); + Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); + + HoodieWriteMetadata> result = executor.execute(inputRecords); + result.setIndexLookupDuration(indexLookupDuration); + return result; + } catch (Throwable e) { + if (e instanceof HoodieUpsertException) { + throw (HoodieUpsertException) e; + } + throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e); + } + } + @Override public List> deduplicateRecords(List> records, HoodieIndex>, List, List> index, int parallelism) { - boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { - HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath - Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; + final Object key = record.getKey().getRecordKey(); return Pair.of(key, record); }).collect(Collectors.groupingBy(Pair::getLeft)); return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + // we cannot allow the user to change the key or partitionPath, since that will affect + // everything + // so pick it from one of the records. HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); - return new HoodieRecord(reducedKey, reducedData); + HoodieRecord hoodieRecord = new HoodieRecord<>(reducedKey, reducedData); + // reuse the location from the first record. + hoodieRecord.setCurrentLocation(rec1.getCurrentLocation()); + return hoodieRecord; }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 8cc9b0df84238..f44e83da97a1b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -116,10 +116,7 @@ private void assignUpdates(WorkloadProfile profile) { private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; - bucketInfo.fileIdPrefix = fileIdHint; - bucketInfo.partitionPath = partitionPath; + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -186,10 +183,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 4f192033dd333..eeeeacf924f39 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -114,10 +114,7 @@ private void assignUpdates(WorkloadProfile profile) { private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; - bucketInfo.fileIdPrefix = fileIdHint; - bucketInfo.partitionPath = partitionPath; + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -184,10 +181,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index ee153c8468ecf..9d60cde69e572 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -120,10 +120,7 @@ private void assignUpdates(WorkloadProfile profile) { private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; - bucketInfo.fileIdPrefix = fileIdHint; - bucketInfo.partitionPath = partitionPath; + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -223,10 +220,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index fcb4fd9176dda..865f0dc1e6dc3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -167,6 +167,12 @@ public void deletePending(HoodieInstant instant) { deleteInstantFile(instant); } + public void deletePending(HoodieInstant.State state, String action, String instantStr) { + HoodieInstant instant = new HoodieInstant(state, action, instantStr); + ValidationUtils.checkArgument(!instant.isCompleted()); + deleteInstantFile(instant); + } + public void deleteCompactionRequested(HoodieInstant instant) { ValidationUtils.checkArgument(instant.isRequested()); ValidationUtils.checkArgument(Objects.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 5c9930d606aaa..dea8a056bc4af 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; @@ -115,7 +114,7 @@ public void open() throws Exception { writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true); // init table, create it if not exists. - initTable(); + StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(cfg)); // create instant marker directory createInstantMarkerDir(); @@ -189,6 +188,7 @@ public void snapshotState(StateSnapshotContext functionSnapshotContext) throws E */ private String startNewInstant(long checkpointId) { String newTime = writeClient.startCommit(); + this.writeClient.transitionRequestedToInflight(this.cfg.tableType, newTime); LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId); return newTime; } @@ -218,20 +218,6 @@ private void doCheck() throws InterruptedException { throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", retryTimes * retryInterval)); } - - /** - * Create table if not exists. - */ - private void initTable() throws IOException { - if (!fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient.initTableType(new Configuration(serializableHadoopConf.get()), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, 1); - LOG.info("Table initialized"); - } else { - LOG.info("Table already [{}/{}] exists, do nothing here", cfg.targetBasePath, cfg.targetTableName); - } - } - @Override public void close() throws Exception { if (writeClient != null) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java index a59a995f4ad71..4309bb008b5a3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java @@ -18,6 +18,7 @@ package org.apache.hudi.operator; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; @@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.exception.HoodieFlinkStreamerException; +import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.java.tuple.Tuple3; @@ -40,8 +42,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * A {@link KeyedProcessFunction} where the write operations really happens. @@ -52,7 +56,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction bufferedRecords = new LinkedList<>(); + private Map> bufferedRecords; /** * Flink collector help s to send data downstream. @@ -88,6 +92,8 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction(); + indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); @@ -112,17 +118,24 @@ public void snapshotState(FunctionSnapshotContext context) { String instantTimestamp = latestInstant; LOG.info("Write records, subtask id = [{}] checkpoint_id = [{}}] instant = [{}], record size = [{}]", indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, bufferedRecords.size()); - List writeStatus; - switch (cfg.operation) { - case INSERT: - writeStatus = writeClient.insert(bufferedRecords, instantTimestamp); - break; - case UPSERT: - writeStatus = writeClient.upsert(bufferedRecords, instantTimestamp); - break; - default: - throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation); - } + final List writeStatus = new ArrayList<>(); + this.bufferedRecords.values().forEach(records -> { + if (records.size() > 0) { + if (cfg.filterDupes) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + } + switch (cfg.operation) { + case INSERT: + writeStatus.addAll(writeClient.insert(records, instantTimestamp)); + break; + case UPSERT: + writeStatus.addAll(writeClient.upsert(records, instantTimestamp)); + break; + default: + throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation); + } + } + }); output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask)); bufferedRecords.clear(); } @@ -144,7 +157,7 @@ public void processElement(HoodieRecord hoodieRecord, Context context, Collector } // buffer the records - bufferedRecords.add(hoodieRecord); + putDataIntoBuffer(hoodieRecord); } public boolean hasRecordsIn() { @@ -155,6 +168,15 @@ public String getLatestInstant() { return latestInstant; } + private void putDataIntoBuffer(HoodieRecord record) { + final String fileId = record.getCurrentLocation().getFileId(); + final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); + if (!this.bufferedRecords.containsKey(key)) { + this.bufferedRecords.put(key, new ArrayList<>()); + } + this.bufferedRecords.get(key).add(record); + } + @Override public void close() { if (writeClient != null) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java index 34a61d4096c7e..58770982418a8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java @@ -18,22 +18,18 @@ package org.apache.hudi.operator; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; -import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; @@ -41,7 +37,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -50,7 +45,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; @@ -96,7 +93,7 @@ public class StreamWriteFunction extends KeyedProcessFunction /** * Write buffer for a checkpoint. */ - private transient List buffer; + private transient Map> buffer; /** * The buffer lock to control data buffering/flushing. @@ -130,23 +127,6 @@ public class StreamWriteFunction extends KeyedProcessFunction private transient BiFunction, String, List> writeFunction; - /** - * HoodieKey generator. - */ - private transient KeyGenerator keyGenerator; - - /** - * Row type of the input. - */ - private final RowType rowType; - - /** - * Avro schema of the input. - */ - private final Schema avroSchema; - - private transient RowDataToAvroConverters.RowDataToAvroConverter converter; - /** * The REQUESTED instant we write the data. */ @@ -160,20 +140,15 @@ public class StreamWriteFunction extends KeyedProcessFunction /** * Constructs a StreamingSinkFunction. * - * @param rowType The input row type * @param config The config options */ - public StreamWriteFunction(RowType rowType, Configuration config) { - this.rowType = rowType; - this.avroSchema = StreamerUtil.getSourceSchema(config); + public StreamWriteFunction(Configuration config) { this.config = config; } @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config)); - this.converter = RowDataToAvroConverters.createConverter(this.rowType); initBuffer(); initWriteClient(); initWriteFunction(); @@ -211,7 +186,7 @@ public void processElement(I value, KeyedProcessFunction.Context ctx, C if (onCheckpointing) { addToBufferCondition.await(); } - this.buffer.add(toHoodieRecord(value)); + putDataIntoBuffer(value); } finally { bufferLock.unlock(); } @@ -230,7 +205,7 @@ public void close() { @VisibleForTesting @SuppressWarnings("rawtypes") - public List getBuffer() { + public Map> getBuffer() { return buffer; } @@ -249,7 +224,7 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { // ------------------------------------------------------------------------- private void initBuffer() { - this.buffer = new ArrayList<>(); + this.buffer = new LinkedHashMap<>(); this.bufferLock = new ReentrantLock(); this.addToBufferCondition = this.bufferLock.newCondition(); } @@ -277,32 +252,33 @@ private void initWriteFunction() { } } - /** - * Converts the give record to a {@link HoodieRecord}. - * - * @param record The input record - * @return HoodieRecord based on the configuration - * @throws IOException if error occurs - */ - @SuppressWarnings("rawtypes") - private HoodieRecord toHoodieRecord(I record) throws IOException { - boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS) - || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; - GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); - final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS); - Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, - this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false); - HoodieRecordPayload payload = shouldCombine - ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal) - : StreamerUtil.createPayload(payloadClazz, gr); - return new HoodieRecord<>(keyGenerator.getKey(gr), payload); + private void putDataIntoBuffer(I value) { + HoodieRecord record = (HoodieRecord) value; + final String fileId = record.getCurrentLocation().getFileId(); + final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); + if (!this.buffer.containsKey(key)) { + this.buffer.put(key, new ArrayList<>()); + } + this.buffer.get(key).add(record); } + @SuppressWarnings("unchecked, rawtypes") private void flushBuffer() { final List writeStatus; if (buffer.size() > 0) { - writeStatus = writeFunction.apply(buffer, currentInstant); - buffer.clear(); + writeStatus = new ArrayList<>(); + this.buffer.values() + // The records are partitioned by the bucket ID and each batch sent to + // the writer belongs to one bucket. + .forEach(records -> { + if (records.size() > 0) { + if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + } + writeStatus.addAll(writeFunction.apply(records, currentInstant)); + } + }); + this.buffer.clear(); } else { LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java index 3f4d940ee23ac..247269c228ca9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.table.types.logical.RowType; /** * Operator for {@link StreamSink}. @@ -36,8 +35,8 @@ public class StreamWriteOperator implements OperatorEventHandler { private final StreamWriteFunction sinkFunction; - public StreamWriteOperator(RowType rowType, Configuration conf) { - super(new StreamWriteFunction<>(rowType, conf)); + public StreamWriteOperator(Configuration conf) { + super(new StreamWriteFunction<>(conf)); this.sinkFunction = (StreamWriteFunction) getUserFunction(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index 524c6015ed52d..bf0cfc27e91e5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -22,9 +22,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -38,8 +35,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +54,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; + /** * {@link OperatorCoordinator} for {@link StreamWriteFunction}. * @@ -121,7 +118,7 @@ public void start() throws Exception { // writeClient initWriteClient(); // init table, create it if not exists. - initTable(); + initTableIfNotExists(this.conf); } @Override @@ -139,6 +136,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r + " data has not finish writing, roll back the last write and throw"; checkAndForceCommit(errMsg); this.inFlightInstant = this.writeClient.startCommit(); + this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); this.inFlightCheckpoint = checkpointId; LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId); result.complete(writeCheckpointBytes()); @@ -200,28 +198,6 @@ private void initWriteClient() { true); } - private void initTable() throws IOException { - final String basePath = this.conf.getString(FlinkOptions.PATH); - final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); - // Hadoop FileSystem - try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { - if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { - HoodieTableMetaClient.initTableType( - hadoopConf, - basePath, - HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), - this.conf.getString(FlinkOptions.TABLE_NAME), - "archived", - this.conf.getString(FlinkOptions.PAYLOAD_CLASS), - 1); - LOG.info("Table initialized"); - } else { - LOG.info("Table [{}/{}] already exists, no need to initialize the table", - basePath, this.conf.getString(FlinkOptions.TABLE_NAME)); - } - } - } - static byte[] readBytes(DataInputStream in, int size) throws IOException { byte[] bytes = new byte[size]; in.readFully(bytes); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java index f5faa54ea3325..56267451fba09 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java @@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.table.types.logical.RowType; /** * Factory class for {@link StreamWriteOperator}. @@ -43,10 +42,9 @@ public class StreamWriteOperatorFactory private final int numTasks; public StreamWriteOperatorFactory( - RowType rowType, Configuration conf, int numTasks) { - super(new StreamWriteOperator<>(rowType, conf)); + super(new StreamWriteOperator<>(conf)); this.operator = (StreamWriteOperator) getOperator(); this.conf = conf; this.numTasks = numTasks; diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java new file mode 100644 index 0000000000000..269ccc801592a --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; + +/** + * The function to build the write profile incrementally for records within a checkpoint, + * it then assigns the bucket with ID using the {@link BucketAssigner}. + * + *

All the records are tagged with HoodieRecordLocation, instead of real instant time, + * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep + * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides + * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether + * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer + * supports specifying the bucket type explicitly. + * + *

The output records should then shuffle by the bucket ID and thus do scalable write. + * + * @see BucketAssigner + */ +public class BucketAssignFunction> + extends KeyedProcessFunction + implements CheckpointedFunction, CheckpointListener { + + private MapState indexState; + + private BucketAssigner bucketAssigner; + + private final Configuration conf; + + private final boolean isChangingRecords; + + public BucketAssignFunction(Configuration conf) { + this.conf = conf; + this.isChangingRecords = WriteOperationType.isChangingRecords( + WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(getRuntimeContext())); + this.bucketAssigner = new BucketAssigner( + context, + writeConfig); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) { + this.bucketAssigner.reset(); + } + + @Override + public void initializeState(FunctionInitializationContext context) { + MapStateDescriptor indexStateDesc = + new MapStateDescriptor<>( + "indexState", + TypeInformation.of(HoodieKey.class), + TypeInformation.of(HoodieRecordLocation.class)); + indexState = context.getKeyedStateStore().getMapState(indexStateDesc); + } + + @SuppressWarnings("unchecked") + @Override + public void processElement(I value, Context ctx, Collector out) throws Exception { + // 1. put the record into the BucketAssigner; + // 2. look up the state for location, if the record has a location, just send it out; + // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out. + HoodieRecord record = (HoodieRecord) value; + final HoodieKey hoodieKey = record.getKey(); + final BucketInfo bucketInfo; + final HoodieRecordLocation location; + // Only changing records need looking up the index for the location, + // append only records are always recognized as INSERT. + if (isChangingRecords && this.indexState.contains(hoodieKey)) { + // Set up the instant time as "U" to mark the bucket as an update bucket. + location = new HoodieRecordLocation("U", this.indexState.get(hoodieKey).getFileId()); + this.bucketAssigner.addUpdate(record.getPartitionPath(), location.getFileId()); + } else { + bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath()); + switch (bucketInfo.getBucketType()) { + case INSERT: + // This is an insert bucket, use HoodieRecordLocation instant time as "I". + // Downstream operators can then check the instant time to know whether + // a record belongs to an insert bucket. + location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); + break; + case UPDATE: + location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); + break; + default: + throw new AssertionError(); + } + this.indexState.put(hoodieKey, location); + } + record.unseal(); + record.setCurrentLocation(location); + record.seal(); + out.collect((O) record); + } + + @Override + public void notifyCheckpointComplete(long l) { + // Refresh the table state when there are new commits. + this.bucketAssigner.refreshTable(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java new file mode 100644 index 0000000000000..f87a802cfa5ae --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; +import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.util.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Bucket assigner that assigns the data buffer of one checkpoint into buckets. + * + *

This assigner assigns the record one by one. + * If the record is an update, checks and reuse existing UPDATE bucket or generates a new one; + * If the record is an insert, checks the record partition for small files first, try to find a small file + * that has space to append new records and reuse the small file's data bucket, if + * there is no small file(or no left space for new records), generates an INSERT bucket. + * + *

Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique + * within and among partitions. + */ +public class BucketAssigner { + private static final Logger LOG = LogManager.getLogger(BucketAssigner.class); + + /** + * Remembers what type each bucket is for later. + */ + private final HashMap bucketInfoMap; + + private HoodieTable table; + + /** + * Fink engine context. + */ + private final HoodieFlinkEngineContext context; + + /** + * The write config. + */ + private final HoodieWriteConfig config; + + /** + * The average record size. + */ + private final long averageRecordSize; + + /** + * Total records to write for each bucket based on + * the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}. + */ + private final long insertRecordsPerBucket; + + /** + * Partition path to small files mapping. + */ + private final Map> partitionSmallFilesMap; + + /** + * Bucket ID(partition + fileId) -> small file assign state. + */ + private final Map smallFileAssignStates; + + /** + * Bucket ID(partition + fileId) -> new file assign state. + */ + private final Map newFileAssignStates; + + public BucketAssigner( + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + bucketInfoMap = new HashMap<>(); + partitionSmallFilesMap = new HashMap<>(); + smallFileAssignStates = new HashMap<>(); + newFileAssignStates = new HashMap<>(); + this.context = context; + this.config = config; + this.table = HoodieFlinkTable.create(this.config, this.context); + averageRecordSize = averageBytesPerRecord( + table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + config); + LOG.info("AvgRecordSize => " + averageRecordSize); + insertRecordsPerBucket = config.shouldAutoTuneInsertSplits() + ? config.getParquetMaxFileSize() / averageRecordSize + : config.getCopyOnWriteInsertSplitSize(); + LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket); + } + + /** + * Reset the states of this assigner, should do once for each checkpoint, + * all the states are accumulated within one checkpoint interval. + */ + public void reset() { + bucketInfoMap.clear(); + partitionSmallFilesMap.clear(); + smallFileAssignStates.clear(); + newFileAssignStates.clear(); + } + + public BucketInfo addUpdate(String partitionPath, String fileIdHint) { + final String key = StreamerUtil.generateBucketKey(partitionPath, fileIdHint); + if (!bucketInfoMap.containsKey(key)) { + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); + bucketInfoMap.put(key, bucketInfo); + } + // else do nothing because the bucket already exists. + return bucketInfoMap.get(key); + } + + public BucketInfo addInsert(String partitionPath) { + // for new inserts, compute buckets depending on how many records we have for each partition + List smallFiles = getSmallFilesForPartition(partitionPath); + + // first try packing this into one of the smallFiles + for (SmallFile smallFile : smallFiles) { + final String key = StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()); + SmallFileAssignState assignState = smallFileAssignStates.get(key); + assert assignState != null; + if (assignState.canAssign()) { + assignState.assign(); + // create a new bucket or re-use an existing bucket + BucketInfo bucketInfo; + if (bucketInfoMap.containsKey(key)) { + // Assigns an inserts to existing update bucket + bucketInfo = bucketInfoMap.get(key); + } else { + bucketInfo = addUpdate(partitionPath, smallFile.location.getFileId()); + } + return bucketInfo; + } + } + + // if we have anything more, create new insert buckets, like normal + if (newFileAssignStates.containsKey(partitionPath)) { + NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath); + if (newFileAssignState.canAssign()) { + newFileAssignState.assign(); + } + final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); + return bucketInfoMap.get(key); + } + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); + final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); + bucketInfoMap.put(key, bucketInfo); + newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket)); + return bucketInfo; + } + + private List getSmallFilesForPartition(String partitionPath) { + if (partitionSmallFilesMap.containsKey(partitionPath)) { + return partitionSmallFilesMap.get(partitionPath); + } + List smallFiles = getSmallFiles(partitionPath); + if (smallFiles.size() > 0) { + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); + partitionSmallFilesMap.put(partitionPath, smallFiles); + smallFiles.forEach(smallFile -> + smallFileAssignStates.put( + StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()), + new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, averageRecordSize))); + return smallFiles; + } + return Collections.emptyList(); + } + + /** + * Refresh the table state like TableFileSystemView and HoodieTimeline. + */ + public void refreshTable() { + this.table = HoodieFlinkTable.create(this.config, this.context); + } + + /** + * Returns a list of small files in the given partition path. + */ + protected List getSmallFiles(String partitionPath) { + + // smallFiles only for partitionPath + List smallFileLocations = new ArrayList<>(); + + HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + + if (!commitTimeline.empty()) { // if we have some commits + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + List allFiles = table.getBaseFileOnlyView() + .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); + + for (HoodieBaseFile file : allFiles) { + if (file.getFileSize() < config.getParquetSmallFileLimit()) { + String filename = file.getFileName(); + SmallFile sf = new SmallFile(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = file.getFileSize(); + smallFileLocations.add(sf); + } + } + } + + return smallFileLocations; + } + + /** + * Obtains the average record size based on records written during previous commits. Used for estimating how many + * records pack into one file. + */ + protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { + long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); + try { + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; + } + } + } + } catch (Throwable t) { + // make this fail safe. + LOG.error("Error trying to compute average bytes/record ", t); + } + return avgSize; + } + + /** + * Candidate bucket state for small file. It records the total number of records + * that the bucket can append and the current number of assigned records. + */ + private static class SmallFileAssignState { + long assigned; + long totalUnassigned; + + SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) { + this.assigned = 0; + this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize; + } + + public boolean canAssign() { + return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned; + } + + /** + * Remembers to invoke {@link #canAssign()} first. + */ + public void assign() { + Preconditions.checkState(canAssign(), + "Can not assign insert to small file: assigned => " + + this.assigned + " totalUnassigned => " + this.totalUnassigned); + this.assigned++; + } + } + + /** + * Candidate bucket state for a new file. It records the total number of records + * that the bucket can append and the current number of assigned records. + */ + private static class NewFileAssignState { + long assigned; + long totalUnassigned; + final String fileId; + + NewFileAssignState(String fileId, long insertRecordsPerBucket) { + this.fileId = fileId; + this.assigned = 0; + this.totalUnassigned = insertRecordsPerBucket; + } + + public boolean canAssign() { + return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned; + } + + /** + * Remembers to invoke {@link #canAssign()} first. + */ + public void assign() { + Preconditions.checkState(canAssign(), + "Can not assign insert to new file: assigned => " + + this.assigned + " totalUnassigned => " + this.totalUnassigned); + this.assigned++; + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java new file mode 100644 index 0000000000000..2d47c7961d4f8 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.transform; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.IOException; + +/** + * Function that transforms RowData to HoodieRecord. + */ +public class RowDataToHoodieFunction> + extends RichMapFunction { + /** + * Row type of the input. + */ + private final RowType rowType; + + /** + * Avro schema of the input. + */ + private transient Schema avroSchema; + + /** + * RowData to Avro record converter. + */ + private transient RowDataToAvroConverters.RowDataToAvroConverter converter; + + /** + * HoodieKey generator. + */ + private transient KeyGenerator keyGenerator; + + /** + * Config options. + */ + private final Configuration config; + + public RowDataToHoodieFunction(RowType rowType, Configuration config) { + this.rowType = rowType; + this.config = config; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.avroSchema = StreamerUtil.getSourceSchema(this.config); + this.converter = RowDataToAvroConverters.createConverter(this.rowType); + this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config)); + } + + @SuppressWarnings("unchecked") + @Override + public O map(I i) throws Exception { + return (O) toHoodieRecord(i); + } + + /** + * Converts the give record to a {@link HoodieRecord}. + * + * @param record The input record + * @return HoodieRecord based on the configuration + * @throws IOException if error occurs + */ + @SuppressWarnings("rawtypes") + private HoodieRecord toHoodieRecord(I record) throws IOException { + boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; + GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); + final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS); + Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, + this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false); + HoodieRecordPayload payload = shouldCombine + ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal) + : StreamerUtil.createPayload(payloadClazz, gr); + return new HoodieRecord<>(keyGenerator.getKey(gr), payload); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 7df63fa4aa750..418e2ea25c804 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -71,8 +71,7 @@ public class FlinkStreamerConfig extends Configuration { + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + "to individual classes, for supported properties.") - public String propsFilePath = - "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; + public String propsFilePath = ""; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.") diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index f6d75d3ea3a43..d110bffef8fb3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -22,9 +22,11 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.InstantGenerateOperator; import org.apache.hudi.operator.KeyedWriteProcessFunction; import org.apache.hudi.operator.KeyedWriteProcessOperator; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; import org.apache.hudi.sink.CommitSink; import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction; import org.apache.hudi.util.StreamerUtil; @@ -34,9 +36,11 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.List; @@ -66,12 +70,16 @@ public static void main(String[] args) throws Exception { env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); } + Configuration conf = FlinkOptions.fromStreamerConfig(cfg); + int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); + TypedProperties props = StreamerUtil.appendKafkaProps(cfg); // add data source config props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName); props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField); + StreamerUtil.initTableIfNotExists(conf); // Read from kafka source DataStream inputRecords = env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props)) @@ -86,13 +94,20 @@ public static void main(String[] args) throws Exception { // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time .keyBy(HoodieRecord::getPartitionPath) - + // use the bucket assigner to generate bucket IDs + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) // write operator, where the write operation really happens .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) .name("write_process") .uid("write_process_uid") - .setParallelism(env.getParallelism()) + .setParallelism(numWriteTask) // Commit can only be executed once, so make it one parallelism .addSink(new CommitSink()) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index a8f92459a1750..24b899496547e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -18,22 +18,25 @@ package org.apache.hudi.streamer; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.StreamWriteOperatorFactory; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import java.util.Properties; @@ -70,13 +73,8 @@ public static void main(String[] args) throws Exception { .getLogicalType(); Configuration conf = FlinkOptions.fromStreamerConfig(cfg); int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask); - - int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)); - LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex); - final RowData.FieldGetter partitionFieldGetter = - RowData.createFieldGetter(partitionFieldType, partitionFieldIndex); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf, numWriteTask); DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, @@ -89,11 +87,19 @@ public static void main(String[] args) throws Exception { ), kafkaProps)) .name("kafka_source") .uid("uid_kafka_source") + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(partitionFieldGetter::getFieldOrNull) + .keyBy(HoodieRecord::getPartitionPath) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", null, operatorFactory) .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTask); // should make it configurable + .setParallelism(numWriteTask); env.addOperator(dataStream.getTransformation()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 9460ee89eea55..1877134a0f5b9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.common.config.DFSPropertiesConfiguration; @@ -57,6 +59,7 @@ * Utilities for Flink stream read and write. */ public class StreamerUtil { + private static final String DEFAULT_ARCHIVE_LOG_FOLDER = "archived"; private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class); @@ -68,6 +71,9 @@ public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) { } public static TypedProperties getProps(FlinkStreamerConfig cfg) { + if (cfg.propsFilePath.isEmpty()) { + return new TypedProperties(); + } return readConfig( FSUtils.getFs(cfg.propsFilePath, getHadoopConf()), new Path(cfg.propsFilePath), cfg.configs).getConfig(); @@ -208,6 +214,10 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco } } + public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) { + return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf)); + } + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() @@ -250,4 +260,37 @@ public static void checkRequiredProperties(TypedProperties props, List c checkPropNames.forEach(prop -> Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing")); } + + /** + * Initialize the table if it does not exist. + * + * @param conf the configuration + * @throws IOException if errors happens when writing metadata + */ + public static void initTableIfNotExists(Configuration conf) throws IOException { + final String basePath = conf.getString(FlinkOptions.PATH); + final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + // Hadoop FileSystem + try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { + if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { + HoodieTableMetaClient.initTableType( + hadoopConf, + basePath, + HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), + conf.getString(FlinkOptions.TABLE_NAME), + DEFAULT_ARCHIVE_LOG_FOLDER, + conf.getString(FlinkOptions.PAYLOAD_CLASS), + 1); + LOG.info("Table initialized under base path {}", basePath); + } else { + LOG.info("Table [{}/{}] already exists, no need to initialize the table", + basePath, conf.getString(FlinkOptions.TABLE_NAME)); + } + } + } + + /** Generates the bucket ID using format {partition path}_{fileID}. */ + public static String generateBucketKey(String partitionPath, String fileId) { + return String.format("%s_%s", partitionPath, fileId); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java index c2d7a65f26d2c..fea9b8f92feb0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java @@ -27,6 +27,7 @@ import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.operator.utils.TestData; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.table.data.RowData; import org.hamcrest.MatcherAssert; @@ -56,24 +57,26 @@ */ public class StreamWriteFunctionTest { - private static final Map EXPECTED = new HashMap<>(); - - static { - EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); - EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); - EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); - EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); - } + private static final Map EXPECTED1 = new HashMap<>(); private static final Map EXPECTED2 = new HashMap<>(); + private static final Map EXPECTED3 = new HashMap<>(); + static { + EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); + EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); + EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); + EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]"); EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, " + "id9,par3,id9,Jane,19,6,par3]"); EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, " + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + + EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); } private StreamWriteFunctionWrapper funcWrapper; @@ -83,9 +86,7 @@ public class StreamWriteFunctionTest { @BeforeEach public void before() throws Exception { - this.funcWrapper = new StreamWriteFunctionWrapper<>( - tempFile.getAbsolutePath(), - TestConfigurations.SERIALIZER); + this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath()); } @AfterEach @@ -211,7 +212,7 @@ public void testInsert() throws Exception { final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED); + checkWrittenData(tempFile, EXPECTED1); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -220,7 +221,43 @@ public void testInsert() throws Exception { funcWrapper.checkpointComplete(1); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); - checkWrittenData(tempFile, EXPECTED); + checkWrittenData(tempFile, EXPECTED1); + } + + @Test + public void testInsertDuplicates() throws Exception { + // reset the config option + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + assertEmptyDataFiles(); + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + checkWrittenData(tempFile, EXPECTED3, 1); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + funcWrapper.checkpointComplete(1); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + checkWrittenData(tempFile, EXPECTED3, 1); } @Test @@ -248,7 +285,7 @@ public void testUpsert() throws Exception { funcWrapper.invoke(rowData); } // the data is not flushed yet - checkWrittenData(tempFile, EXPECTED); + checkWrittenData(tempFile, EXPECTED1); // this triggers the data write and event send funcWrapper.checkpointFunction(2); diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java index 56f946b9647f0..f745a3c89fcb9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -18,16 +18,24 @@ package org.apache.hudi.operator; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.sink.CommitSink; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; @@ -37,7 +45,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; -import org.apache.flink.table.data.RowData; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; @@ -47,6 +55,7 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -74,19 +83,15 @@ public void testWriteToHoodie() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); execEnv.setParallelism(4); - // 1 second a time - execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); - // Read from kafka source + // Read from file source RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(rowType, conf, 4); - - int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)); - final RowData.FieldGetter partitionFieldGetter = - RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), partitionFieldIndex); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf, 4); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, @@ -107,17 +112,103 @@ public void testWriteToHoodie() throws Exception { // use PROCESS_CONTINUOUSLY mode to trigger checkpoint .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(partitionFieldGetter::getFieldOrNull) + .keyBy(HoodieRecord::getPartitionPath) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", null, operatorFactory) - .uid("uid_hoodie_stream_write") - .setParallelism(4); + .uid("uid_hoodie_stream_write"); execEnv.addOperator(dataStream.getTransformation()); JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); if (client.getJobStatus().get() != JobStatus.FAILED) { try { - TimeUnit.SECONDS.sleep(10); + TimeUnit.SECONDS.sleep(8); + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + + TestData.checkWrittenData(tempFile, EXPECTED); + } + + @Test + public void testWriteToHoodieLegacy() throws Exception { + FlinkStreamerConfig streamerConf = TestConfigurations.getDefaultStreamerConf(tempFile.getAbsolutePath()); + Configuration conf = FlinkOptions.fromStreamerConfig(streamerConf); + StreamerUtil.initTableIfNotExists(conf); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getConfig().setGlobalJobParameters(streamerConf); + + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + new RowDataTypeInfo(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + .transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) + .name("instant_generator") + .uid("instant_generator_id") + + // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time + .keyBy(HoodieRecord::getPartitionPath) + // use the bucket assigner to generate bucket IDs + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + // write operator, where the write operation really happens + .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { + }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) + .name("write_process") + .uid("write_process_uid") + .setParallelism(4) + + // Commit can only be executed once, so make it one parallelism + .addSink(new CommitSink()) + .name("commit_sink") + .uid("commit_sink_uid") + .setParallelism(1); + + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(8); client.cancel(); } catch (Throwable var1) { // ignored diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java new file mode 100644 index 0000000000000..e27ea0757b32c --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; +import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link BucketAssigner}. + */ +public class TestBucketAssigner { + private HoodieWriteConfig writeConfig; + private HoodieFlinkEngineContext context; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws IOException { + final String basePath = tempFile.getAbsolutePath(); + final Configuration conf = TestConfigurations.getDefaultConf(basePath); + + writeConfig = StreamerUtil.getHoodieClientConfig(conf); + context = new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(null)); + StreamerUtil.initTableIfNotExists(conf); + } + + @Test + public void testAddUpdate() { + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); + BucketInfo bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0"); + + mockBucketAssigner.addUpdate("par1", "file_id_0"); + bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0"); + + mockBucketAssigner.addUpdate("par1", "file_id_1"); + bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_1"); + + bucketInfo = mockBucketAssigner.addUpdate("par2", "file_id_0"); + assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "file_id_0"); + + bucketInfo = mockBucketAssigner.addUpdate("par3", "file_id_2"); + assertBucketEquals(bucketInfo, "par3", BucketType.UPDATE, "file_id_2"); + } + + @Test + public void testAddInsert() { + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.INSERT); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.INSERT); + + mockBucketAssigner.addInsert("par2"); + bucketInfo = mockBucketAssigner.addInsert("par2"); + assertBucketEquals(bucketInfo, "par2", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + } + + @Test + public void testInsertWithSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1)); + smallFilesMap.put("par2", Collections.singletonList(f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig, smallFilesMap); + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par2"); + bucketInfo = mockBucketAssigner.addInsert("par2"); + assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2"); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + } + + @Test + public void testUpdateAndInsertWithSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1)); + smallFilesMap.put("par2", Collections.singletonList(f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig, smallFilesMap); + mockBucketAssigner.addUpdate("par1", "f0"); + + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addUpdate("par1", "f2"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addUpdate("par2", "f0"); + + mockBucketAssigner.addInsert("par2"); + bucketInfo = mockBucketAssigner.addInsert("par2"); + assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2"); + } + + private void assertBucketEquals( + BucketInfo bucketInfo, + String partition, + BucketType bucketType, + String fileId) { + BucketInfo actual = new BucketInfo(bucketType, fileId, partition); + assertThat(bucketInfo, is(actual)); + } + + private void assertBucketEquals( + BucketInfo bucketInfo, + String partition, + BucketType bucketType) { + assertThat(bucketInfo.getPartitionPath(), is(partition)); + assertThat(bucketInfo.getBucketType(), is(bucketType)); + } + + /** + * Mock BucketAssigner that can specify small files explicitly. + */ + static class MockBucketAssigner extends BucketAssigner { + private final Map> smallFilesMap; + + MockBucketAssigner( + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + this(context, config, Collections.emptyMap()); + } + + MockBucketAssigner( + HoodieFlinkEngineContext context, + HoodieWriteConfig config, + Map> smallFilesMap) { + super(context, config); + this.smallFilesMap = smallFilesMap; + } + + @Override + protected List getSmallFiles(String partitionPath) { + if (this.smallFilesMap.containsKey(partitionPath)) { + return this.smallFilesMap.get(partitionPath); + } + return Collections.emptyList(); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java index 1b02791e7dfc2..59de2832664c0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -18,7 +18,14 @@ package org.apache.hudi.operator.utils; -import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.operator.StreamWriteFunction; +import org.apache.hudi.operator.StreamWriteOperatorCoordinator; +import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; + import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -27,13 +34,9 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; - -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.operator.StreamWriteFunction; -import org.apache.hudi.operator.StreamWriteOperatorCoordinator; -import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; import java.util.concurrent.CompletableFuture; @@ -43,7 +46,6 @@ * @param Input type */ public class StreamWriteFunctionWrapper { - private final TypeSerializer serializer; private final Configuration conf; private final IOManager ioManager; @@ -52,10 +54,18 @@ public class StreamWriteFunctionWrapper { private final StreamWriteOperatorCoordinator coordinator; private final MockFunctionInitializationContext functionInitializationContext; - private StreamWriteFunction function; + /** Function that converts row data to HoodieRecord. */ + private RowDataToHoodieFunction> toHoodieFunction; + /** Function that assigns bucket ID. */ + private BucketAssignFunction, HoodieRecord> bucketAssignerFunction; + /** Stream write function. */ + private StreamWriteFunction, Object> writeFunction; - public StreamWriteFunctionWrapper(String tablePath, TypeSerializer serializer) throws Exception { - this.serializer = serializer; + public StreamWriteFunctionWrapper(String tablePath) throws Exception { + this(tablePath, TestConfigurations.getDefaultConf(tablePath)); + } + + public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws Exception { this.ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") @@ -64,7 +74,7 @@ public StreamWriteFunctionWrapper(String tablePath, TypeSerializer serializer .build(); this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); this.gateway = new MockOperatorEventGateway(); - this.conf = TestConfigurations.getDefaultConf(tablePath); + this.conf = conf; // one function this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); this.coordinator.start(); @@ -72,14 +82,37 @@ public StreamWriteFunctionWrapper(String tablePath, TypeSerializer serializer } public void openFunction() throws Exception { - function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, this.conf); - function.setRuntimeContext(runtimeContext); - function.setOperatorEventGateway(gateway); - function.open(this.conf); + toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); + toHoodieFunction.setRuntimeContext(runtimeContext); + toHoodieFunction.open(conf); + + bucketAssignerFunction = new BucketAssignFunction<>(conf); + bucketAssignerFunction.setRuntimeContext(runtimeContext); + bucketAssignerFunction.open(conf); + bucketAssignerFunction.initializeState(this.functionInitializationContext); + + writeFunction = new StreamWriteFunction<>(conf); + writeFunction.setRuntimeContext(runtimeContext); + writeFunction.setOperatorEventGateway(gateway); + writeFunction.open(conf); } public void invoke(I record) throws Exception { - function.processElement(record, null, null); + HoodieRecord hoodieRecord = toHoodieFunction.map((RowData) record); + HoodieRecord[] hoodieRecords = new HoodieRecord[1]; + Collector> collector = new Collector>() { + @Override + public void collect(HoodieRecord record) { + hoodieRecords[0] = record; + } + + @Override + public void close() { + + } + }; + bucketAssignerFunction.processElement(hoodieRecord, null, collector); + writeFunction.processElement(hoodieRecords[0], null, null); } public BatchWriteSuccessEvent[] getEventBuffer() { @@ -92,19 +125,22 @@ public OperatorEvent getNextEvent() { @SuppressWarnings("rawtypes") public HoodieFlinkWriteClient getWriteClient() { - return this.function.getWriteClient(); + return this.writeFunction.getWriteClient(); } public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); - function.snapshotState(new MockFunctionSnapshotContext(checkpointId)); + bucketAssignerFunction.snapshotState(null); + + writeFunction.snapshotState(null); functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.checkpointComplete(checkpointId); + this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); } public void checkpointFails(long checkpointId) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java index 7513feddc58d1..d9e603a1459f3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java @@ -19,6 +19,7 @@ package org.apache.hudi.operator.utils; import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; @@ -56,4 +57,16 @@ public static Configuration getDefaultConf(String tablePath) { conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); return conf; } + + public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) { + FlinkStreamerConfig streamerConf = new FlinkStreamerConfig(); + streamerConf.targetBasePath = tablePath; + streamerConf.readSchemaFilePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString(); + streamerConf.targetTableName = "TestHoodieTable"; + streamerConf.partitionPathField = "partition"; + streamerConf.tableType = "COPY_ON_WRITE"; + streamerConf.checkpointInterval = 4000L; + return streamerConf; + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java index 7c2c314511c35..b4c24ef695311 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -43,6 +43,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; import static org.hamcrest.CoreMatchers.is; @@ -92,6 +93,13 @@ public class TestData { TimestampData.fromEpochMillis(8), StringData.fromString("par4")) ); + public static List DATA_SET_THREE = new ArrayList<>(); + static { + IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add( + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); + } + /** * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. * @@ -101,13 +109,29 @@ public class TestData { * @param expected The expected results mapping, the key should be the partition path */ public static void checkWrittenData(File baseFile, Map expected) throws IOException { + checkWrittenData(baseFile, expected, 4); + } + + /** + * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param baseFile The file base to check, should be a directly + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + */ + public static void checkWrittenData( + File baseFile, + Map expected, + int partitions) throws IOException { assert baseFile.isDirectory(); FileFilter filter = file -> !file.getName().startsWith("."); File[] partitionDirs = baseFile.listFiles(filter); assertNotNull(partitionDirs); - assertThat(partitionDirs.length, is(4)); + assertThat(partitionDirs.length, is(partitions)); for (File partitionDir : partitionDirs) { - File[] dataFiles = partitionDir.listFiles(file -> file.getName().endsWith(".parquet")); + File[] dataFiles = partitionDir.listFiles(filter); assertNotNull(dataFiles); File latestDataFile = Arrays.stream(dataFiles) .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))