diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java index 8c92b0033a7d2..e318fe304c67b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java @@ -18,14 +18,6 @@ package org.apache.hudi.table.action.commit; -import java.io.ByteArrayOutputStream; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.model.HoodieBaseFile; @@ -37,15 +29,23 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Iterator; /** - * Helper to read records from previous version of parquet and run Merge. + * Helper to read records from previous version of base file and run Merge. */ public abstract class AbstractMergeHelper { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 4e08003404022..f975406e4505b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -96,6 +96,11 @@ public List> filterExists(List> hoodieRecords) { return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList()); } + @Override + public void bootstrap(Option> extraMetadata) { + throw new HoodieNotSupportedException("Bootstrap operation is not supported yet"); + } + @Override public List upsert(List> records, String instantTime) { HoodieTable>, List, List> table = diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 52052fd368529..74c921fd0cb8b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -18,21 +18,23 @@ package org.apache.hudi.client.common; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.function.SerializableConsumer; import org.apache.hudi.client.common.function.SerializableFunction; import org.apache.hudi.client.common.function.SerializablePairFunction; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.util.Option; -import scala.Tuple2; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.hadoop.conf.Configuration; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import scala.Tuple2; + import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java index 83b76fc8930f7..427212c6f897b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java @@ -19,7 +19,6 @@ package org.apache.hudi.index; import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext; @@ -31,6 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.state.FlinkInMemoryStateIndex; +import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.table.HoodieTable; import java.util.List; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 6d4c570790e0a..acb010c313065 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -46,6 +46,14 @@ import java.util.List; import java.util.Map; +/** + * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with + * zero read amplification. + *

+ * INSERTS - Produce new files, block aligned to desired size (or) Merge with the smallest existing file, to expand it + *

+ * UPDATES - Produce a new version of the file, just replacing the updated records with new values + */ public class HoodieFlinkCopyOnWriteTable extends HoodieFlinkTable { protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 79fb376be61b4..3c09b38e64faa 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -35,7 +35,7 @@ import java.util.List; -public abstract class HoodieFlinkTable +public abstract class HoodieFlinkTable extends HoodieTable>, List, List> { protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 0b98b11049a24..1d40b8e95a539 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.action.commit; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.common.model.HoodieBaseFile; @@ -47,6 +46,7 @@ import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java index bc1293a16b71a..57a87c412fa2a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java @@ -94,7 +94,7 @@ public HoodieWriteMetadata> execute(String instantTime, List> dedupedRecords = dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList()); Instant beginTag = Instant.now(); - // perform index loop up to get existing location of records + // perform index look up to get existing location of records List> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table); Duration tagLocationDuration = Duration.between(beginTag, Instant.now()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 4bfaab4456513..2bcd3b2a7189e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -49,6 +49,9 @@ import scala.Tuple2; +/** + * Packs incoming records to be upserted, into buckets. + */ public class UpsertPartitioner> implements Partitioner { private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java index 1e666b117c654..8cf91a21e3382 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java @@ -30,11 +30,12 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; -import scala.Tuple2; import java.util.List; import java.util.stream.Collectors; +import scala.Tuple2; + @SuppressWarnings("checkstyle:LineLength") public class FlinkMarkerBasedRollbackStrategy extends AbstractMarkerBasedRollbackStrategy>, List, List> { public FlinkMarkerBasedRollbackStrategy(HoodieTable>, List, List> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index 2b619fb04acd8..612635da871f8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -50,6 +50,9 @@ import scala.Tuple2; +/** + * Performs Rollback of Hoodie Tables. + */ public class ListingBasedRollbackHelper implements Serializable { private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class); diff --git a/hudi-flink/src/main/java/org/apache/hudi/HudiFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java similarity index 99% rename from hudi-flink/src/main/java/org/apache/hudi/HudiFlinkStreamer.java rename to hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java index 1206cbe512989..0c9991da3db4a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/HudiFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java @@ -18,6 +18,17 @@ package org.apache.hudi; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.operator.InstantGenerateOperator; +import org.apache.hudi.operator.KeyedWriteProcessFunction; +import org.apache.hudi.operator.KeyedWriteProcessOperator; +import org.apache.hudi.sink.CommitSink; +import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction; +import org.apache.hudi.util.StreamerUtil; + import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -31,16 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.operator.InstantGenerateOperator; -import org.apache.hudi.operator.KeyedWriteProcessFunction; -import org.apache.hudi.operator.KeyedWriteProcessOperator; -import org.apache.hudi.sink.CommitSink; -import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction; -import org.apache.hudi.util.StreamerUtil; import java.util.ArrayList; import java.util.List; @@ -51,7 +52,7 @@ * An Utility which can incrementally consume data from Kafka and apply it to the target table. * currently, it only support COW table and insert, upsert operation. */ -public class HudiFlinkStreamer { +public class HoodieFlinkStreamer { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index b242276085724..165eeb087b207 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -18,17 +18,7 @@ package org.apache.hudi.operator; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.HudiFlinkStreamer; +import org.apache.hudi.HoodieFlinkStreamer; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; @@ -42,6 +32,17 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator streamRecord) throws Excep public void open() throws Exception { super.open(); // get configs from runtimeContext - cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); // retry times retryTimes = Integer.valueOf(cfg.blockRetryTime); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java index 9a751e80e83a3..d3ebddfc30fda 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java @@ -18,7 +18,7 @@ package org.apache.hudi.operator; -import org.apache.hudi.HudiFlinkStreamer; +import org.apache.hudi.HoodieFlinkStreamer; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; @@ -77,7 +77,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction, Integer> writeStatues, Context context) { - LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], records size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size()); + LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], WriteStatus size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size()); try { if (bufferedWriteStatus.containsKey(writeStatues.f0)) { bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1); diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java index 01d5cafc9b4c6..f8784841374f9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java @@ -18,9 +18,7 @@ package org.apache.hudi.source; -import org.apache.avro.generic.GenericRecord; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.hudi.HudiFlinkStreamer; +import org.apache.hudi.HoodieFlinkStreamer; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; @@ -30,6 +28,9 @@ import org.apache.hudi.util.AvroConvertor; import org.apache.hudi.util.StreamerUtil; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.functions.MapFunction; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +43,12 @@ public class JsonStringToHoodieRecordMapFunction implements MapFunction