diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index 3ceadf3526c75..c81734b8d5206 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -30,6 +30,7 @@ ${project.parent.basedir} + 1.11.1 @@ -170,10 +171,23 @@ provided + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + org.xerial.snappy + snappy-java + + + + org.apache.parquet parquet-avro + ${parquet.version} test diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 59192381d8dbb..c13d7823b0124 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.streamer.FlinkStreamerConfig; @@ -30,10 +31,13 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; -import java.util.Arrays; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Hoodie Flink config options. @@ -287,12 +291,6 @@ private FlinkOptions() { // Utilities // ------------------------------------------------------------------------- - // Remember to update the set when adding new options. - public static final List> OPTIONAL_OPTIONS = Arrays.asList( - TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES, - RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS - ); - // Prefix for Hoodie specific properties. private static final String PROPERTIES_PREFIX = "properties."; @@ -385,4 +383,32 @@ public static boolean isDefaultValueDefined(Configuration conf, ConfigOption return !conf.getOptional(option).isPresent() || conf.get(option).equals(option.defaultValue()); } + + /** + * Returns all the optional config options. + */ + public static Set> optionalOptions() { + Set> options = new HashSet<>(allOptions()); + options.remove(PATH); + return options; + } + + /** + * Returns all the config options. + */ + public static List> allOptions() { + Field[] declaredFields = FlinkOptions.class.getDeclaredFields(); + List> options = new ArrayList<>(); + for (Field field : declaredFields) { + if (java.lang.reflect.Modifier.isStatic(field.getModifiers()) + && field.getType().equals(ConfigOption.class)) { + try { + options.add((ConfigOption) field.get(ConfigOption.class)); + } catch (IllegalAccessException e) { + throw new HoodieException("Error while fetching static config option", e); + } + } + } + return options; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 9a509fd5937b1..659bb2ea2687d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -157,7 +157,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { // start to commit the instant. checkAndCommitWithRetry(); // if async compaction is on, schedule the compaction @@ -182,7 +182,7 @@ public void notifyCheckpointAborted(long checkpointId) { } @Override - public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { + public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception { if (checkpointData != null) { // restore when any checkpoint completed deserializeCheckpointAndRestore(checkpointData); @@ -215,6 +215,11 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // no operation } + @Override + public void subtaskReset(int i, long l) { + // no operation + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index c62307e3b6857..60edfa44b6854 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import java.util.Properties; @@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception { cfg.kafkaTopic, new JsonRowDataDeserializationSchema( rowType, - new RowDataTypeInfo(rowType), + InternalTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601 diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 7ce8880e1d53f..22dcd3e3a9b96 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,21 +19,20 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.TableSinkFactory; -import org.apache.flink.table.factories.TableSourceFactory; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.hadoop.fs.Path; @@ -41,62 +40,57 @@ import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Set; /** * Hoodie data source/sink factory. */ -public class HoodieTableFactory implements TableSourceFactory, TableSinkFactory { +public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class); public static final String FACTORY_ID = "hudi"; @Override - public TableSource createTableSource(TableSourceFactory.Context context) { - Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); - TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); - setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); - // enclosing the code within a try catch block so that we can log the error message. - // Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory - // to create the source/sink and catches all the exceptions then tries the new factory. - // - // log the error message first so that there is a chance to show the real failure cause. - try { - Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> - new ValidationException("Option [path] should not be empty."))); - return new HoodieTableSource( - schema, - path, - context.getTable().getPartitionKeys(), - conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), - conf); - } catch (Throwable throwable) { - LOG.error("Create table source error", throwable); - throw new HoodieException(throwable); - } + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + Configuration conf = (Configuration) helper.getOptions(); + TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); + + Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> + new ValidationException("Option [path] should not be empty."))); + return new HoodieTableSource( + schema, + path, + context.getCatalogTable().getPartitionKeys(), + conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), + conf); } @Override - public TableSink createTableSink(TableSinkFactory.Context context) { - Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); - TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); - setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); + public DynamicTableSink createDynamicTableSink(Context context) { + Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); + TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); return new HoodieTableSink(conf, schema); } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID); - return context; + public String factoryIdentifier() { + return FACTORY_ID; + } + + @Override + public Set> requiredOptions() { + return Collections.singleton(FlinkOptions.PATH); } @Override - public List supportedProperties() { - // contains format properties. - return Collections.singletonList("*"); + public Set> optionalOptions() { + return FlinkOptions.optionalOptions(); } // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 0891f19723dee..81aaf59f48987 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -34,23 +34,22 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.sinks.AppendStreamTableSink; -import org.apache.flink.table.sinks.PartitionableTableSink; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.types.DataType; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; import java.util.Map; /** * Hoodie table sink. */ -public class HoodieTableSink implements AppendStreamTableSink, PartitionableTableSink { +public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { private final Configuration conf; private final TableSchema schema; @@ -61,68 +60,76 @@ public HoodieTableSink(Configuration conf, TableSchema schema) { } @Override - public DataStreamSink consumeDataStream(DataStream dataStream) { - // Read from kafka source - RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType(); - int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS); - StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return (DataStreamSinkProvider) dataStream -> { + // Read from kafka source + RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); + int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS); + StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - DataStream pipeline = dataStream - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTasks); - if (StreamerUtil.needsScheduleCompaction(conf)) { - return pipeline.transform("compact_plan_generate", - TypeInformation.of(CompactionPlanEvent.class), - new CompactionPlanOperator(conf)) - .uid("uid_compact_plan_generate") - .setParallelism(1) // plan generate must be singleton - .keyBy(event -> event.getOperation().hashCode()) - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new KeyedProcessOperator<>(new CompactFunction(conf))) - .addSink(new CompactionCommitSink(conf)) - .name("compact_commit") - .setParallelism(1); // compaction commit should be singleton - } else { - return pipeline.addSink(new DummySinkFunction<>()) - .name("dummy").uid("uid_dummy"); - } + DataStream pipeline = dataStream + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + // Key-by partition path, to avoid multiple subtasks write to a partition at the same time + .keyBy(HoodieRecord::getPartitionPath) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_stream_write") + .setParallelism(numWriteTasks); + if (StreamerUtil.needsScheduleCompaction(conf)) { + return pipeline.transform("compact_plan_generate", + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) + .uid("uid_compact_plan_generate") + .setParallelism(1) // plan generate must be singleton + .keyBy(event -> event.getOperation().hashCode()) + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new KeyedProcessOperator<>(new CompactFunction(conf))) + .addSink(new CompactionCommitSink(conf)) + .name("compact_commit") + .setParallelism(1); // compaction commit should be singleton + } else { + return pipeline.addSink(new DummySinkFunction<>()) + .setParallelism(1) + .name("dummy").uid("uid_dummy"); + } + }; } - @Override - public TableSink configure(String[] strings, TypeInformation[] infos) { - return this; + @VisibleForTesting + public Configuration getConf() { + return this.conf; } @Override - public TableSchema getTableSchema() { - return this.schema; + public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { + // ignore RowKind.UPDATE_BEFORE + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); } @Override - public DataType getConsumedDataType() { - return this.schema.toRowDataType().bridgedTo(RowData.class); + public DynamicTableSink copy() { + return new HoodieTableSink(this.conf, this.schema); } @Override - public void setStaticPartition(Map partitions) { - // no operation + public String asSummaryString() { + return "HoodieTableSink"; } - @VisibleForTesting - public Configuration getConf() { - return this.conf; + @Override + public void applyStaticPartition(Map map) { + // no operation } // Dummy sink function that does nothing. diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 7bb20afc760be..f92e1a8b5ade4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -57,18 +57,20 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; -import org.apache.flink.table.sources.FilterableTableSource; -import org.apache.flink.table.sources.LimitableTableSource; -import org.apache.flink.table.sources.PartitionableTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; @@ -83,6 +85,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -94,11 +97,11 @@ * Hoodie batch table source that always read the latest snapshot of the underneath table. */ public class HoodieTableSource implements - StreamTableSource, - PartitionableTableSource, - ProjectableTableSource, - LimitableTableSource, - FilterableTableSource { + ScanTableSource, + SupportsPartitionPushDown, + SupportsProjectionPushDown, + SupportsLimitPushDown, + SupportsFilterPushDown { private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class); private static final int NO_LIMIT_CONSTANT = -1; @@ -113,9 +116,9 @@ public class HoodieTableSource implements private final String defaultPartName; private final Configuration conf; - private final int[] requiredPos; - private final long limit; - private final List filters; + private int[] requiredPos; + private long limit; + private List filters; private List> requiredPartitions; @@ -161,94 +164,92 @@ public HoodieTableSource( } @Override - public DataStream getDataStream(StreamExecutionEnvironment execEnv) { - @SuppressWarnings("unchecked") - TypeInformation typeInfo = - (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); - if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { - StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( - conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes); - OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true)); - SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") - .setParallelism(1) - .uid("uid_streaming_source") - .transform("split_reader", typeInfo, factory) - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_split_reader"); - return new DataStreamSource<>(source); - } else { - InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); - DataStreamSource source = execEnv.addSource(func, explainSource(), typeInfo); - return source.name("streaming_source") - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_streaming_source"); - } - } - - @Override - public boolean isBounded() { - return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING); + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return new DataStreamScanProvider() { + + @Override + public boolean isBounded() { + return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING); + } + + @Override + public DataStream produceDataStream(StreamExecutionEnvironment execEnv) { + @SuppressWarnings("unchecked") + TypeInformation typeInfo = + (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); + if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { + StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( + conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes); + OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true)); + SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") + .setParallelism(1) + .uid("uid_streaming_source") + .transform("split_reader", typeInfo, factory) + .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) + .uid("uid_split_reader"); + return new DataStreamSource<>(source); + } else { + InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); + DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); + return source.name("streaming_source") + .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) + .uid("uid_streaming_source"); + } + } + }; } @Override - public TableSource applyPredicate(List predicates) { - return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, - requiredPartitions, requiredPos, limit, new ArrayList<>(predicates)); + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); } @Override - public boolean isFilterPushedDown() { - return this.filters != null && this.filters.size() > 0; + public DynamicTableSource copy() { + return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, + conf, requiredPartitions, requiredPos, limit, filters); } @Override - public boolean isLimitPushedDown() { - return this.limit != NO_LIMIT_CONSTANT; + public String asSummaryString() { + return "HudiTableSource"; } @Override - public TableSource applyLimit(long limit) { - return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, - requiredPartitions, requiredPos, limit, filters); + public Result applyFilters(List filters) { + this.filters = new ArrayList<>(filters); + return Result.of(new ArrayList<>(filters), new ArrayList<>(filters)); } @Override - public List> getPartitions() { - return FilePathUtils.getPartitions(path, hadoopConf, partitionKeys, defaultPartName, - conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)); + public Optional>> listPartitions() { + List> partitions = FilePathUtils.getPartitions(path, hadoopConf, + partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)); + return Optional.of(partitions); } @Override - public TableSource applyPartitionPruning(List> requiredPartitions) { - return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, - requiredPartitions, requiredPos, limit, filters); + public void applyPartitions(List> partitions) { + this.requiredPartitions = partitions; } @Override - public TableSource projectFields(int[] requiredPos) { - return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, - requiredPartitions, requiredPos, limit, filters); + public boolean supportsNestedProjection() { + return false; } @Override - public TableSchema getTableSchema() { - return schema; + public void applyProjection(int[][] projections) { + // nested projection is not supported. + this.requiredPos = Arrays.stream(projections).mapToInt(array -> array[0]).toArray(); } @Override - public String explainSource() { - final String filterString = filters.stream() - .map(Expression::asSummaryString) - .collect(Collectors.joining(",")); - return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) - + (requiredPartitions == null ? "" : ", requiredPartition=" + requiredPartitions) - + (requiredPos == null ? "" : ", requiredPos=" + Arrays.toString(requiredPos)) - + (limit == -1 ? "" : ", limit=" + limit) - + (filters.size() == 0 ? "" : ", filters=" + filterString); + public void applyLimit(long limit) { + this.limit = limit; } - @Override - public DataType getProducedDataType() { + private DataType getProducedDataType() { String[] schemaFieldNames = this.schema.getFieldNames(); DataType[] schemaTypes = this.schema.getFieldDataTypes(); @@ -260,7 +261,7 @@ public DataType getProducedDataType() { private List> getOrFetchPartitions() { if (requiredPartitions == null) { - requiredPartitions = getPartitions(); + requiredPartitions = listPartitions().orElse(Collections.emptyList()); } return requiredPartitions; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 0058603656851..4756c24348c9f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -225,8 +225,8 @@ public static TypedProperties flinkConf2TypedProperties(Configuration conf) { // put all the set up options conf.addAllToProperties(properties); // put all the default options - for (ConfigOption option : FlinkOptions.OPTIONAL_OPTIONS) { - if (!conf.contains(option)) { + for (ConfigOption option : FlinkOptions.optionalOptions()) { + if (!conf.contains(option) && option.hasDefaultValue()) { properties.put(option.key(), option.defaultValue()); } } diff --git a/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory rename to hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index a446ee469118f..bcd6d542892d8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -53,7 +53,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; -import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; import org.junit.jupiter.api.Test; @@ -104,7 +104,7 @@ public void testWriteToHoodie() throws Exception { JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, - new RowDataTypeInfo(rowType), + InternalTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601 @@ -135,7 +135,7 @@ public void testWriteToHoodie() throws Exception { JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); // wait for the streaming job to finish - client.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); + client.getJobExecutionResult().get(); TestData.checkWrittenFullData(tempFile, EXPECTED); } @@ -159,7 +159,7 @@ public void testWriteToHoodieLegacy() throws Exception { JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, - new RowDataTypeInfo(rowType), + InternalTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601 @@ -204,7 +204,7 @@ public void testWriteToHoodieLegacy() throws Exception { JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); // wait for the streaming job to finish - client.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); + client.getJobExecutionResult().get(); TestData.checkWrittenFullData(tempFile, EXPECTED); } @@ -230,7 +230,7 @@ public void testMergeOnReadWriteWithCompaction() throws Exception { JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, - new RowDataTypeInfo(rowType), + InternalTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601 diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index bc8f956564b85..eab1374f2d6b2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -94,7 +94,7 @@ void testInstantState() { coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(1, event1); - coordinator.checkpointComplete(1); + coordinator.notifyCheckpointComplete(1); String inflight = coordinator.getWriteClient() .getInflightAndRequestedInstant("COPY_ON_WRITE"); String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE"); @@ -116,7 +116,7 @@ public void testTableInitialized() throws IOException { public void testCheckpointAndRestore() throws Exception { CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - coordinator.resetToCheckpoint(future.get()); + coordinator.resetToCheckpoint(1, future.get()); } @Test @@ -145,7 +145,7 @@ public void testCheckpointCompleteWithRetry() { .build(); coordinator.handleEventFromOperator(0, event); assertThrows(HoodieException.class, - () -> coordinator.checkpointComplete(1), + () -> coordinator.notifyCheckpointComplete(1), "Try 3 to commit instant"); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java index 192183effef04..ff71089f71ecb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java @@ -20,8 +20,6 @@ import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.BroadcastState; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -89,11 +87,6 @@ public AggregatingState getAggregatingState(AggregatingStateDesc return null; } - @Override - public FoldingState getFoldingState(FoldingStateDescriptor foldingStateDescriptor) { - return null; - } - @Override @SuppressWarnings("unchecked") public MapState getMapState(MapStateDescriptor mapStateDescriptor) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 783f78599fbe0..72f2e89a66fe2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -156,7 +156,7 @@ public void checkpointFunction(long checkpointId) throws Exception { public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); - coordinator.checkpointComplete(checkpointId); + coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); this.writeFunction.notifyCheckpointComplete(checkpointId); if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index ed5f1323da142..291c582e01751 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -218,8 +218,7 @@ private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish try { - tableResult.getJobClient().get() - .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); + tableResult.getJobClient().get().getJobExecutionResult().get(); } catch (InterruptedException | ExecutionException ex) { throw new RuntimeException(ex); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 44c030aaf4f67..bbb59640f468a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -30,8 +30,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.factories.TableSinkFactory; -import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.factories.DynamicTableFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -78,14 +77,14 @@ void beforeEach() throws IOException { void testInferAvroSchemaForSource() { // infer the schema if not specified final HoodieTableSource tableSource1 = - (HoodieTableSource) new HoodieTableFactory().createTableSource(MockSourceContext.getInstance(this.conf)); + (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf)); final Configuration conf1 = tableSource1.getConf(); assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA)); // set up the explicit schema using the file path this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH); HoodieTableSource tableSource2 = - (HoodieTableSource) new HoodieTableFactory().createTableSource(MockSourceContext.getInstance(this.conf)); + (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf)); Configuration conf2 = tableSource2.getConf(); assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null"); } @@ -101,8 +100,8 @@ void testSetupHoodieKeyOptionsForSource() { .field("f2", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); - final MockSourceContext sourceContext1 = MockSourceContext.getInstance(this.conf, schema1, "f2"); - final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createTableSource(sourceContext1); + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); final Configuration conf1 = tableSource1.getConf(); assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass")); @@ -115,8 +114,8 @@ void testSetupHoodieKeyOptionsForSource() { .field("f2", DataTypes.TIMESTAMP(3)) .primaryKey("f0", "f1") .build(); - final MockSourceContext sourceContext2 = MockSourceContext.getInstance(this.conf, schema2, "f2"); - final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createTableSource(sourceContext2); + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2"); + final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); final Configuration conf2 = tableSource2.getConf(); assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName())); @@ -126,14 +125,14 @@ void testSetupHoodieKeyOptionsForSource() { void testInferAvroSchemaForSink() { // infer the schema if not specified final HoodieTableSink tableSink1 = - (HoodieTableSink) new HoodieTableFactory().createTableSink(MockSinkContext.getInstance(this.conf)); + (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); final Configuration conf1 = tableSink1.getConf(); assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA)); // set up the explicit schema using the file path this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH); HoodieTableSink tableSink2 = - (HoodieTableSink) new HoodieTableFactory().createTableSink(MockSinkContext.getInstance(this.conf)); + (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); Configuration conf2 = tableSink2.getConf(); assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null"); } @@ -149,8 +148,8 @@ void testSetupHoodieKeyOptionsForSink() { .field("f2", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); - final MockSinkContext sinkContext1 = MockSinkContext.getInstance(this.conf, schema1, "f2"); - final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createTableSink(sinkContext1); + final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1); final Configuration conf1 = tableSink1.getConf(); assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass")); @@ -163,8 +162,8 @@ void testSetupHoodieKeyOptionsForSink() { .field("f2", DataTypes.TIMESTAMP(3)) .primaryKey("f0", "f1") .build(); - final MockSinkContext sinkContext2 = MockSinkContext.getInstance(this.conf, schema2, "f2"); - final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createTableSink(sinkContext2); + final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema2, "f2"); + final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2); final Configuration conf2 = tableSink2.getConf(); assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName())); @@ -175,29 +174,29 @@ void testSetupHoodieKeyOptionsForSink() { // ------------------------------------------------------------------------- /** - * Mock context for table source. + * Mock dynamic table factory context. */ - private static class MockSourceContext implements TableSourceFactory.Context { + private static class MockContext implements DynamicTableFactory.Context { private final Configuration conf; private final TableSchema schema; private final List partitions; - private MockSourceContext(Configuration conf, TableSchema schema, List partitions) { + private MockContext(Configuration conf, TableSchema schema, List partitions) { this.conf = conf; this.schema = schema; this.partitions = partitions; } - static MockSourceContext getInstance(Configuration conf) { + static MockContext getInstance(Configuration conf) { return getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition")); } - static MockSourceContext getInstance(Configuration conf, TableSchema schema, String partition) { + static MockContext getInstance(Configuration conf, TableSchema schema, String partition) { return getInstance(conf, schema, Collections.singletonList(partition)); } - static MockSourceContext getInstance(Configuration conf, TableSchema schema, List partitions) { - return new MockSourceContext(conf, schema, partitions); + static MockContext getInstance(Configuration conf, TableSchema schema, List partitions) { + return new MockContext(conf, schema, partitions); } @Override @@ -206,7 +205,7 @@ public ObjectIdentifier getObjectIdentifier() { } @Override - public CatalogTable getTable() { + public CatalogTable getCatalogTable() { return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock source table"); } @@ -214,51 +213,14 @@ public CatalogTable getTable() { public ReadableConfig getConfiguration() { return conf; } - } - - /** - * Mock context for table sink. - */ - private static class MockSinkContext implements TableSinkFactory.Context { - private final Configuration conf; - private final TableSchema schema; - private final List partitions; - - private MockSinkContext(Configuration conf, TableSchema schema, List partitions) { - this.conf = conf; - this.schema = schema; - this.partitions = partitions; - } - - static MockSinkContext getInstance(Configuration conf) { - return getInstance(conf, TestConfigurations.TABLE_SCHEMA, "partition"); - } - - static MockSinkContext getInstance(Configuration conf, TableSchema schema, String partition) { - return getInstance(conf, schema, Collections.singletonList(partition)); - } - - static MockSinkContext getInstance(Configuration conf, TableSchema schema, List partitions) { - return new MockSinkContext(conf, schema, partitions); - } - - @Override - public ObjectIdentifier getObjectIdentifier() { - return ObjectIdentifier.of("hudi", "default", "t1"); - } @Override - public CatalogTable getTable() { - return new CatalogTableImpl(this.schema, this.partitions, conf.toMap(), "mock sink table"); - } - - @Override - public ReadableConfig getConfiguration() { - return conf; + public ClassLoader getClassLoader() { + return null; } @Override - public boolean isBounded() { + public boolean isTemporary() { return false; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 2acc3234e4c24..fed3748adca8a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -88,8 +88,7 @@ void testGetReadPaths() { Map partitions = new HashMap<>(); partitions.put("partition", "par1"); - tableSource = (HoodieTableSource) tableSource - .applyPartitionPruning(Collections.singletonList(partitions)); + tableSource.applyPartitions(Collections.singletonList(partitions)); Path[] paths2 = tableSource.getReadPaths(); assertNotNull(paths2); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index c508a7fb58a24..ae15abab39760 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -161,9 +161,8 @@ void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { Map prunedPartitions = new HashMap<>(); prunedPartitions.put("partition", "par1"); // prune to only be with partition 'par1' - HoodieTableSource newSource = (HoodieTableSource) tableSource - .applyPartitionPruning(Collections.singletonList(prunedPartitions)); - InputFormat inputFormat = newSource.getInputFormat(); + tableSource.applyPartitions(Collections.singletonList(prunedPartitions)); + InputFormat inputFormat = tableSource.getInputFormat(); List result = readData(inputFormat); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index eaf5979760386..392cf5f8539ed 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -23,7 +23,6 @@ import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; @@ -119,7 +118,7 @@ public static String getCollectSinkDDL(String tableName) { + ")"; } - public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE); + public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE); public static Configuration getDefaultConf(String tablePath) { Configuration conf = new Configuration(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index ab7f544bf447d..d1e310382fe4e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -41,7 +41,7 @@ import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.data.writer.BinaryRowWriter; import org.apache.flink.table.data.writer.BinaryWriter; -import org.apache.flink.table.runtime.types.InternalSerializers; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java index d27679bad5cf6..b8d7de6d879e2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -103,7 +103,11 @@ private CollectTableSink( @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { - return ChangelogMode.insertOnly(); + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java index 21128bedf4710..bdffcd553c9eb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java @@ -21,42 +21,46 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.utils.source.ContinuousFileSource; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.TableSourceFactory; -import org.apache.flink.table.sources.TableSource; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Set; /** * Factory for ContinuousFileSource. */ -public class ContinuousFileSourceFactory implements TableSourceFactory { +public class ContinuousFileSourceFactory implements DynamicTableSourceFactory { public static final String FACTORY_ID = "continuous-file-source"; @Override - public TableSource createTableSource(Context context) { - Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + Configuration conf = (Configuration) helper.getOptions(); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should be not empty."))); - return new ContinuousFileSource(context.getTable().getSchema(), path, conf); + return new ContinuousFileSource(context.getCatalogTable().getSchema(), path, conf); + } + + @Override + public String factoryIdentifier() { + return FACTORY_ID; } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID); - return context; + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public List supportedProperties() { - return Collections.singletonList("*"); + public Set> optionalOptions() { + return Collections.singleton(FlinkOptions.PATH); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index 446aba0a75bd8..d9435132076c4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -27,10 +27,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.types.DataType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import java.io.IOException; @@ -53,7 +55,7 @@ * *

If all the data are flushed out, it waits for the next checkpoint to finish and tear down the source. */ -public class ContinuousFileSource implements StreamTableSource { +public class ContinuousFileSource implements ScanTableSource { private final TableSchema tableSchema; private final Path path; @@ -69,30 +71,46 @@ public ContinuousFileSource( } @Override - public DataStream getDataStream(StreamExecutionEnvironment execEnv) { - final RowType rowType = (RowType) this.tableSchema.toRowDataType().getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( - rowType, - new RowDataTypeInfo(rowType), - false, - true, - TimestampFormat.ISO_8601); - - return execEnv.addSource(new BoundedSourceFunction(this.path, 2)) - .name("continuous_file_source") - .setParallelism(1) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)), - new RowDataTypeInfo(rowType)); + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return new DataStreamScanProvider() { + + @Override + public boolean isBounded() { + return false; + } + + @Override + public DataStream produceDataStream(StreamExecutionEnvironment execEnv) { + final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601); + + return execEnv.addSource(new BoundedSourceFunction(path, 2)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)), + InternalTypeInfo.of(rowType)); + } + }; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); } @Override - public TableSchema getTableSchema() { - return this.tableSchema; + public DynamicTableSource copy() { + return new ContinuousFileSource(this.tableSchema, this.path, this.conf); } @Override - public DataType getProducedDataType() { - return this.tableSchema.toRowDataType().bridgedTo(RowData.class); + public String asSummaryString() { + return "ContinuousFileSource"; } /** diff --git a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 5fec9b622e06b..19e43c4f81d82 100644 --- a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,4 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. +org.apache.hudi.utils.factory.ContinuousFileSourceFactory org.apache.hudi.utils.factory.CollectSinkTableFactory diff --git a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory deleted file mode 100644 index 87c3d990a1798..0000000000000 --- a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.hudi.table.HoodieTableFactory -org.apache.hudi.utils.factory.ContinuousFileSourceFactory diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 29913937c3a74..7a30b74fb2f8b 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -33,6 +33,8 @@ ${project.parent.basedir} org.apache.hudi. 3.1.0 + + 1.11.1 @@ -89,8 +91,9 @@ org.apache.parquet:parquet-hadoop org.apache.parquet:parquet-column org.apache.parquet:parquet-common - org.apache.parquet:parquet-format + org.apache.parquet:parquet-format-structures org.apache.parquet:parquet-encoding + org.apache.parquet:parquet-jackson org.apache.avro:avro joda-time:joda-time @@ -100,11 +103,6 @@ com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} - io.confluent:kafka-avro-serializer - io.confluent:common-config - io.confluent:common-utils - io.confluent:kafka-schema-registry-client - org.apache.kafka:kafka-clients io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite io.prometheus:simpleclient @@ -113,16 +111,12 @@ io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common com.yammer.metrics:metrics-core - org.apache.kafka:kafka_${scala.binary.version} - com.101tec:zkclient org.eclipse.jetty:* org.eclipse.jetty.websocket:* javax.servlet:javax.servlet-api - org.apache.flink:flink-connector-kafka_${scala.binary.version} - org.apache.flink:flink-connector-kafka-base_${scala.binary.version} org.apache.flink:flink-hadoop-compatibility_${scala.binary.version} org.apache.flink:flink-avro org.apache.flink:flink-json @@ -133,14 +127,18 @@ org.apache.hive:hive-metastore org.apache.hive:hive-jdbc - org.apache.hbase:hbase-client org.apache.hbase:hbase-common - org.apache.hbase:hbase-protocol - org.apache.hbase:hbase-server - org.apache.htrace:htrace-core + + org.apache.avro. + ${flink.bundle.hive.shade.prefix}org.apache.avro. + + + org.apache.parquet. + ${flink.bundle.hive.shade.prefix}org.apache.parquet. + com.yammer.metrics. org.apache.hudi.com.yammer.metrics. @@ -198,8 +196,6 @@ META-INF/*.DSA META-INF/*.RSA META-INF/services/javax.* - - META-INF/services/org.apache.flink.table.factories.Factory @@ -258,17 +254,6 @@ - - org.apache.flink - flink-connector-kafka_${scala.binary.version} - compile - - - org.apache.flink - flink-connector-kafka-base_${scala.binary.version} - ${flink.version} - compile - org.apache.flink flink-hadoop-compatibility_${scala.binary.version} @@ -295,9 +280,62 @@ + + org.apache.parquet + parquet-hadoop + ${parquet.version} + compile + + + org.xerial.snappy + snappy-java + + + org.apache.parquet parquet-avro + ${parquet.version} + compile + + + org.apache.hadoop + hadoop-client + + + it.unimi.dsi + fastutil + + + + + org.apache.parquet + parquet-column + ${parquet.version} + compile + + + org.apache.parquet + parquet-common + ${parquet.version} + compile + + + org.apache.parquet + parquet-encoding + ${parquet.version} + compile + + + org.apache.parquet + parquet-format-structures + ${parquet.version} + compile + + + org.apache.parquet + parquet-jackson + ${parquet.version} compile @@ -391,52 +429,11 @@ compile - - org.apache.htrace - htrace-core - ${htrace.version} - compile - - org.apache.hbase hbase-common ${hbase.version} - - org.apache.hbase - hbase-server - ${hbase.version} - compile - - - javax.servlet - * - - - org.codehaus.jackson - * - - - org.mortbay.jetty - * - - - tomcat - * - - - - - org.apache.hbase - hbase-client - ${hbase.version} - - - org.apache.hbase - hbase-protocol - ${hbase.version} - \ No newline at end of file diff --git a/pom.xml b/pom.xml index cfd924145478a..4c950feb1c578 100644 --- a/pom.xml +++ b/pom.xml @@ -107,7 +107,7 @@ 4.4.1 ${spark2.version} ${spark2bundle.version} - 1.11.2 + 1.12.2 2.4.4 3.0.0 diff --git a/style/checkstyle-suppressions.xml b/style/checkstyle-suppressions.xml index 401a3eadf7a7a..c2e63db143b13 100644 --- a/style/checkstyle-suppressions.xml +++ b/style/checkstyle-suppressions.xml @@ -28,5 +28,6 @@ +