Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,6 +85,11 @@ public class StreamWriteOperatorCoordinator
*/
private final Context context;

/**
* Gateways for sending events to sub tasks.
*/
private transient SubtaskGateway[] gateways;

/**
* Write client.
*/
Expand Down Expand Up @@ -150,6 +154,7 @@ public StreamWriteOperatorCoordinator(
public void start() throws Exception {
// initialize event buffer
reset();
this.gateways = new SubtaskGateway[this.parallelism];
this.writeClient = StreamerUtil.createWriteClient(conf);
this.tableState = TableState.create(conf);
// init table, create it if not exists.
Expand Down Expand Up @@ -257,6 +262,11 @@ public void subtaskReset(int i, long l) {
// no operation
}

@Override
public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
this.gateways[i] = subtaskGateway;
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -397,13 +407,8 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) {
*/
private void sendCommitAckEvents() {
CompletableFuture<?>[] futures = IntStream.range(0, this.parallelism)
.mapToObj(taskID -> {
try {
return this.context.sendEvent(CommitAckEvent.getInstance(), taskID);
} catch (TaskNotRunningException e) {
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e);
}
}).toArray(CompletableFuture<?>[]::new);
.mapToObj(taskID -> this.gateways[taskID].sendEvent(CommitAckEvent.getInstance()))
.toArray(CompletableFuture<?>[]::new);
try {
CompletableFuture.allOf(futures).get();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,23 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

/**
* Tools to generate the sort operator.
*/
public class SortOperatorGen {
private final int[] sortIndices;
private final LogicalType[] sortTypes;
private final RowType rowType;
private final TableConfig tableConfig = new TableConfig();

public SortOperatorGen(RowType rowType, String[] sortFields) {
this.sortIndices = Arrays.stream(sortFields).mapToInt(rowType::getFieldIndex).toArray();
List<RowType.RowField> fields = rowType.getFields();
this.sortTypes = Arrays.stream(sortIndices).mapToObj(idx -> fields.get(idx).getType()).toArray(LogicalType[]::new);
this.rowType = rowType;
}

public OneInputStreamOperator<RowData, RowData> createSortOperator() {
Expand All @@ -51,8 +49,8 @@ public OneInputStreamOperator<RowData, RowData> createSortOperator() {
}

private SortCodeGenerator createSortCodeGenerator() {
boolean[] padBooleans = new boolean[sortIndices.length];
IntStream.range(0, sortIndices.length).forEach(i -> padBooleans[i] = true);
return new SortCodeGenerator(tableConfig, sortIndices, sortTypes, padBooleans, padBooleans);
SortSpec.SortSpecBuilder builder = SortSpec.builder();
IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true));
return new SortCodeGenerator(tableConfig, rowType, builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void mergeWith(WriteMetadataEvent other) {
ValidationUtils.checkArgument(this.taskID == other.taskID);
// the instant time could be monotonically increasing
this.instantTime = other.instantTime;
this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true.
this.lastBatch |= other.lastBatch; // true if one of the event lastBatch is true
List<WriteStatus> statusList = new ArrayList<>();
statusList.addAll(this.writeStatuses);
statusList.addAll(other.writeStatuses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
import org.apache.flink.util.Collector;

import java.util.Objects;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void initializeState(FunctionInitializationContext context) {
TypeInformation.of(HoodieRecordGlobalLocation.class));
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
if (ttl > 0) {
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) ttl)).build());
}
indexState = context.getKeyedStateStore().getState(indexStateDesc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import com.beust.jcommander.JCommander;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -54,9 +54,7 @@

/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
* <p>
* note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
* currently, it only supports COW table and insert, upsert operation.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,7 +45,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Hoodie data source/sink factory.
Expand All @@ -62,7 +60,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
helper.validate();

Configuration conf = (Configuration) helper.getOptions();
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);

Expand All @@ -79,7 +77,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
Expand Down Expand Up @@ -110,8 +108,8 @@ public Set<ConfigOption<?>> optionalOptions() {
* @param conf The table options
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, TableSchema schema) {
List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();

// validate record key in pk absence.
if (!schema.getPrimaryKey().isPresent()) {
Expand Down Expand Up @@ -144,7 +142,7 @@ private static void setupConfOptions(
Configuration conf,
String tableName,
CatalogTable table,
TableSchema schema) {
ResolvedSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
// hoodie key about options
Expand All @@ -154,7 +152,7 @@ private static void setupConfOptions(
// hive options
setupHiveOptions(conf);
// infer avro schema from physical DDL schema
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType());
}

/**
Expand Down
35 changes: 16 additions & 19 deletions hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,24 @@
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.logical.RowType;

import java.util.Map;
Expand All @@ -62,15 +62,15 @@
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

private final Configuration conf;
private final TableSchema schema;
private final ResolvedSchema schema;
private boolean overwrite = false;

public HoodieTableSink(Configuration conf, TableSchema schema) {
public HoodieTableSink(Configuration conf, ResolvedSchema schema) {
this.conf = conf;
this.schema = schema;
}

public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite) {
public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite) {
this.conf = conf;
this.schema = schema;
this.overwrite = overwrite;
Expand All @@ -85,7 +85,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);

RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();

// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
Expand All @@ -108,7 +108,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
TypeInformation.of(RowData.class),
sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
}
Expand Down Expand Up @@ -203,21 +203,18 @@ public String asSummaryString() {
}

@Override
public void applyStaticPartition(Map<String, String> partition) {
public void applyStaticPartition(Map<String, String> partitions) {
// #applyOverwrite should have been invoked.
if (this.overwrite) {
final String operationType;
if (partition.size() > 0) {
operationType = WriteOperationType.INSERT_OVERWRITE.value();
} else {
operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value();
}
this.conf.setString(FlinkOptions.OPERATION, operationType);
if (this.overwrite && partitions.size() > 0) {
this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
}
}

@Override
public void applyOverwrite(boolean b) {
this.overwrite = b;
public void applyOverwrite(boolean overwrite) {
this.overwrite = overwrite;
// set up the operation as INSERT_OVERWRITE_TABLE first,
// if there are explicit partitions, #applyStaticPartition would overwrite the option.
this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value());
}
}
Loading