Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

<properties>
<main.basedir>${project.parent.basedir}</main.basedir>
<parquet.version>1.11.1</parquet.version>
</properties>

<build>
Expand Down Expand Up @@ -170,10 +171,23 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
Expand All @@ -30,10 +31,13 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;

import java.util.Arrays;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Hoodie Flink config options.
Expand Down Expand Up @@ -287,12 +291,6 @@ private FlinkOptions() {
// Utilities
// -------------------------------------------------------------------------

// Remember to update the set when adding new options.
public static final List<ConfigOption<?>> OPTIONAL_OPTIONS = Arrays.asList(
TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES,
RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS
);

// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";

Expand Down Expand Up @@ -385,4 +383,32 @@ public static <T> boolean isDefaultValueDefined(Configuration conf, ConfigOption
return !conf.getOptional(option).isPresent()
|| conf.get(option).equals(option.defaultValue());
}

/**
* Returns all the optional config options.
*/
public static Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>(allOptions());
options.remove(PATH);
return options;
}

/**
* Returns all the config options.
*/
public static List<ConfigOption<?>> allOptions() {
Field[] declaredFields = FlinkOptions.class.getDeclaredFields();
List<ConfigOption<?>> options = new ArrayList<>();
for (Field field : declaredFields) {
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())
&& field.getType().equals(ConfigOption.class)) {
try {
options.add((ConfigOption<?>) field.get(ConfigOption.class));
} catch (IllegalAccessException e) {
throw new HoodieException("Error while fetching static config option", e);
}
}
}
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
}

@Override
public void checkpointComplete(long checkpointId) {
public void notifyCheckpointComplete(long checkpointId) {
// start to commit the instant.
checkAndCommitWithRetry();
// if async compaction is on, schedule the compaction
Expand All @@ -182,7 +182,7 @@ public void notifyCheckpointAborted(long checkpointId) {
}

@Override
public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception {
public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception {
if (checkpointData != null) {
// restore when any checkpoint completed
deserializeCheckpointAndRestore(checkpointData);
Expand Down Expand Up @@ -215,6 +215,11 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) {
// no operation
}

@Override
public void subtaskReset(int i, long l) {
// no operation
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import java.util.Properties;
Expand Down Expand Up @@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception {
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
new RowDataTypeInfo(rowType),
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,84 +19,78 @@
package org.apache.hudi.table;

import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Hoodie data source/sink factory.
*/
public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSinkFactory<RowData> {
public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class);

public static final String FACTORY_ID = "hudi";

@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
// enclosing the code within a try catch block so that we can log the error message.
// Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory
// to create the source/sink and catches all the exceptions then tries the new factory.
//
// log the error message first so that there is a chance to show the real failure cause.
try {
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
return new HoodieTableSource(
schema,
path,
context.getTable().getPartitionKeys(),
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
conf);
} catch (Throwable throwable) {
LOG.error("Create table source error", throwable);
throw new HoodieException(throwable);
}
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();

Configuration conf = (Configuration) helper.getOptions();
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);

Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
return new HoodieTableSource(
schema,
path,
context.getCatalogTable().getPartitionKeys(),
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
conf);
}

@Override
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID);
return context;
public String factoryIdentifier() {
return FACTORY_ID;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.singleton(FlinkOptions.PATH);
}

@Override
public List<String> supportedProperties() {
// contains format properties.
return Collections.singletonList("*");
public Set<ConfigOption<?>> optionalOptions() {
return FlinkOptions.optionalOptions();
}

// -------------------------------------------------------------------------
Expand Down
Loading