Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>hudi-flink-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Flink -->
<dependency>
Expand Down Expand Up @@ -134,6 +139,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
Expand All @@ -152,6 +163,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Parquet -->
<dependency>
Expand Down Expand Up @@ -192,8 +209,30 @@
<artifactId>bijection-avro_${scala.binary.version}</artifactId>
<version>0.9.7</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>${hive.exec.classifier}</classifier>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- test dependencies -->
<!-- Test dependencies -->

<!-- Junit 5 dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.factory;

import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.sink.HoodieTableSink;
import org.apache.hudi.source.HoodieTableSource;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Hoodie data source/sink factory.
*/
public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSinkFactory<RowData> {
public static final String FACTORY_ID = "hudi";

@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
conf.setString(FlinkOptions.TABLE_NAME.key(), context.getObjectIdentifier().getObjectName());
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys()));
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should be not empty.")));
return new HoodieTableSource(
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()),
path,
context.getTable().getPartitionKeys(),
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
conf);
}

@Override
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
conf.setString(FlinkOptions.TABLE_NAME.key(), context.getObjectIdentifier().getObjectName());
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys()));
return new HoodieTableSink(conf,
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()),
context.isBounded());
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID);
return context;
}

@Override
public List<String> supportedProperties() {
// contains format properties.
return Collections.singletonList("*");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.operator;

import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
Expand Down Expand Up @@ -55,6 +54,17 @@ private FlinkOptions() {
+ "The path would be created if it does not exist,\n"
+ "otherwise a Hoodie table expects to be initialized successfully");

// ------------------------------------------------------------------------
// Common Options
// ------------------------------------------------------------------------

public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default.name")
.stringType()
.defaultValue("__DEFAULT_PARTITION__")
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");

// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
Expand All @@ -64,6 +74,44 @@ private FlinkOptions() {
.noDefaultValue()
.withDescription("Avro schema file path, the parsed schema is used for deserializing");

public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
public static final String QUERY_TYPE_INCREMENTAL = "incremental";
public static final ConfigOption<String> QUERY_TYPE = ConfigOptions
.key("hoodie.datasource.query.type")
.stringType()
.defaultValue(QUERY_TYPE_SNAPSHOT)
.withDescription("Decides how data files need to be read, in\n"
+ "1) Snapshot mode (obtain latest view, based on row & columnar data);\n"
+ "2) incremental mode (new data since an instantTime);\n"
+ "3) Read Optimized mode (obtain latest view, based on columnar data)\n."
+ "Default: snapshot");

public static final String REALTIME_SKIP_MERGE = "skip_merge";
public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
public static final ConfigOption<String> MERGE_TYPE = ConfigOptions
.key("hoodie.datasource.merge.type")
.stringType()
.defaultValue(REALTIME_PAYLOAD_COMBINE)
.withDescription("For Snapshot query on merge on read table. Use this key to define how the payloads are merged, in\n"
+ "1) skip_merge: read the base file records plus the log file records;\n"
+ "2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n"
+ " log file records(combines the two records with same key for base and log file records), then read the left log file records");

public static final ConfigOption<Boolean> HIVE_STYLE_PARTITION = ConfigOptions
.key("hoodie.datasource.hive_style_partition")
.booleanType()
.defaultValue(false)
.withDescription("Whether the partition path is with Hive style, e.g. '{partition key}={partition value}', default false");

public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions
.key("read.utc-timezone")
.booleanType()
.defaultValue(true)
.withDescription("Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true");

// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
Expand All @@ -73,11 +121,13 @@ private FlinkOptions() {
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");

public static final String TABLE_TYPE_COPY_ON_WRITE = "COPY_ON_WRITE";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we link the String to HoodieTableType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, i think a constant here is more easy to use.

public static final String TABLE_TYPE_MERGE_ON_READ = "MERGE_ON_READ";
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("write.table.type")
.stringType()
.defaultValue(HoodieTableType.COPY_ON_WRITE.name())
.withDescription("Type of table to write, COPY_ON_WRITE (or) MERGE_ON_READ");
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");

public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
Expand Down Expand Up @@ -249,14 +299,21 @@ public static org.apache.flink.configuration.Configuration fromStreamerConfig(Fl
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
}

/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();

if (hasPropertyOptions(options)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
final String subKey = key.substring((prefix).length());
hoodieProperties.put(subKey, value);
});
}
Expand All @@ -283,7 +340,7 @@ private static boolean hasPropertyOptions(Map<String, String> options) {
}

/** Creates a new configuration that is initialized with the options of the given map. */
private static Configuration fromMap(Map<String, String> map) {
public static Configuration fromMap(Map<String, String> map) {
final Configuration configuration = new Configuration();
map.forEach(configuration::setString);
return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public void notifyCheckpointComplete(long checkpointId) {
this.writeClient.cleanHandles();
}

/**
* End input action for batch source.
*/
public void endInput() {
flushBuffer(true);
this.writeClient.cleanHandles();
}

// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamSink;

Expand All @@ -32,7 +33,7 @@
*/
public class StreamWriteOperator<I>
extends KeyedProcessOperator<Object, I, Object>
implements OperatorEventHandler {
implements OperatorEventHandler, BoundedOneInput {
private final StreamWriteFunction<Object, I, Object> sinkFunction;

public StreamWriteOperator(Configuration conf) {
Expand All @@ -48,4 +49,9 @@ public void handleOperatorEvent(OperatorEvent operatorEvent) {
void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
sinkFunction.setOperatorEventGateway(operatorEventGateway);
}

@Override
public void endInput() throws Exception {
sinkFunction.endInput();
}
}
Loading