diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 53122c3eced1d..1a89400ff7019 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +85,11 @@ public class StreamWriteOperatorCoordinator */ private final Context context; + /** + * Gateways for sending events to sub tasks. + */ + private transient SubtaskGateway[] gateways; + /** * Write client. */ @@ -150,6 +154,7 @@ public StreamWriteOperatorCoordinator( public void start() throws Exception { // initialize event buffer reset(); + this.gateways = new SubtaskGateway[this.parallelism]; this.writeClient = StreamerUtil.createWriteClient(conf); this.tableState = TableState.create(conf); // init table, create it if not exists. @@ -257,6 +262,11 @@ public void subtaskReset(int i, long l) { // no operation } + @Override + public void subtaskReady(int i, SubtaskGateway subtaskGateway) { + this.gateways[i] = subtaskGateway; + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -397,13 +407,8 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) { */ private void sendCommitAckEvents() { CompletableFuture[] futures = IntStream.range(0, this.parallelism) - .mapToObj(taskID -> { - try { - return this.context.sendEvent(CommitAckEvent.getInstance(), taskID); - } catch (TaskNotRunningException e) { - throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e); - } - }).toArray(CompletableFuture[]::new); + .mapToObj(taskID -> this.gateways[taskID].sendEvent(CommitAckEvent.getInstance())) + .toArray(CompletableFuture[]::new); try { CompletableFuture.allOf(futures).get(); } catch (Exception e) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java index d178661759b68..4d3fc08efe197 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java @@ -22,11 +22,10 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; -import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec; import org.apache.flink.table.types.logical.RowType; import java.util.Arrays; -import java.util.List; import java.util.stream.IntStream; /** @@ -34,13 +33,12 @@ */ public class SortOperatorGen { private final int[] sortIndices; - private final LogicalType[] sortTypes; + private final RowType rowType; private final TableConfig tableConfig = new TableConfig(); public SortOperatorGen(RowType rowType, String[] sortFields) { this.sortIndices = Arrays.stream(sortFields).mapToInt(rowType::getFieldIndex).toArray(); - List fields = rowType.getFields(); - this.sortTypes = Arrays.stream(sortIndices).mapToObj(idx -> fields.get(idx).getType()).toArray(LogicalType[]::new); + this.rowType = rowType; } public OneInputStreamOperator createSortOperator() { @@ -51,8 +49,8 @@ public OneInputStreamOperator createSortOperator() { } private SortCodeGenerator createSortCodeGenerator() { - boolean[] padBooleans = new boolean[sortIndices.length]; - IntStream.range(0, sortIndices.length).forEach(i -> padBooleans[i] = true); - return new SortCodeGenerator(tableConfig, sortIndices, sortTypes, padBooleans, padBooleans); + SortSpec.SortSpecBuilder builder = SortSpec.builder(); + IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true)); + return new SortCodeGenerator(tableConfig, rowType, builder.build()); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java index 7aec6be414756..eb89b5ad7ebbb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java @@ -149,7 +149,7 @@ public void mergeWith(WriteMetadataEvent other) { ValidationUtils.checkArgument(this.taskID == other.taskID); // the instant time could be monotonically increasing this.instantTime = other.instantTime; - this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true. + this.lastBatch |= other.lastBatch; // true if one of the event lastBatch is true List statusList = new ArrayList<>(); statusList.addAll(this.writeStatuses); statusList.addAll(other.writeStatuses); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index e6c59b1ef25bb..5c9d26f08befc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -37,15 +37,16 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.table.runtime.util.StateTtlConfigUtil; import org.apache.flink.util.Collector; import java.util.Objects; @@ -147,7 +148,7 @@ public void initializeState(FunctionInitializationContext context) { TypeInformation.of(HoodieRecordGlobalLocation.class)); double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000; if (ttl > 0) { - indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl)); + indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) ttl)).build()); } indexState = context.getKeyedStateStore().getState(indexStateDesc); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 5b0ba4f5aa54c..7fde34e9b3363 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -39,8 +39,8 @@ import com.beust.jcommander.JCommander; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -54,9 +54,7 @@ /** * An Utility which can incrementally consume data from Kafka and apply it to the target table. - * currently, it only support COW table and insert, upsert operation. - *

- * note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from. + * currently, it only supports COW table and insert, upsert operation. */ public class HoodieFlinkStreamer { public static void main(String[] args) throws Exception { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 413f395fc86f0..c753dde549465 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -27,17 +27,16 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * Hoodie data source/sink factory. @@ -62,7 +60,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { helper.validate(); Configuration conf = (Configuration) helper.getOptions(); - TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); @@ -79,7 +77,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { @Override public DynamicTableSink createDynamicTableSink(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); - TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); return new HoodieTableSink(conf, schema); @@ -110,8 +108,8 @@ public Set> optionalOptions() { * @param conf The table options * @param schema The table schema */ - private void sanityCheck(Configuration conf, TableSchema schema) { - List fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList()); + private void sanityCheck(Configuration conf, ResolvedSchema schema) { + List fields = schema.getColumnNames(); // validate record key in pk absence. if (!schema.getPrimaryKey().isPresent()) { @@ -144,7 +142,7 @@ private static void setupConfOptions( Configuration conf, String tableName, CatalogTable table, - TableSchema schema) { + ResolvedSchema schema) { // table name conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); // hoodie key about options @@ -154,7 +152,7 @@ private static void setupConfOptions( // hive options setupHiveOptions(conf); // infer avro schema from physical DDL schema - inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType()); + inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 8a16c6d4f52cf..ca6d33a544981 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -34,9 +34,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; -import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -44,14 +44,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.ProcessOperator; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.types.logical.RowType; import java.util.Map; @@ -62,15 +62,15 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { private final Configuration conf; - private final TableSchema schema; + private final ResolvedSchema schema; private boolean overwrite = false; - public HoodieTableSink(Configuration conf, TableSchema schema) { + public HoodieTableSink(Configuration conf, ResolvedSchema schema) { this.conf = conf; this.schema = schema; } - public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite) { + public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite) { this.conf = conf; this.schema = schema; this.overwrite = overwrite; @@ -85,7 +85,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .getCheckpointConfig().getCheckpointTimeout(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); - RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); + RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType(); // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); @@ -108,7 +108,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { TypeInformation.of(RowData.class), sortOperatorGen.createSortOperator()) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); - ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(), + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } } @@ -203,21 +203,18 @@ public String asSummaryString() { } @Override - public void applyStaticPartition(Map partition) { + public void applyStaticPartition(Map partitions) { // #applyOverwrite should have been invoked. - if (this.overwrite) { - final String operationType; - if (partition.size() > 0) { - operationType = WriteOperationType.INSERT_OVERWRITE.value(); - } else { - operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value(); - } - this.conf.setString(FlinkOptions.OPERATION, operationType); + if (this.overwrite && partitions.size() > 0) { + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value()); } } @Override - public void applyOverwrite(boolean b) { - this.overwrite = b; + public void applyOverwrite(boolean overwrite) { + this.overwrite = overwrite; + // set up the operation as INSERT_OVERWRITE_TABLE first, + // if there are explicit partitions, #applyStaticPartition would overwrite the option. + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value()); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index e1eec281577ab..91afd491bbb50 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -109,7 +109,7 @@ public class HoodieTableSource implements private final transient HoodieTableMetaClient metaClient; private final long maxCompactionMemoryInBytes; - private final TableSchema schema; + private final ResolvedSchema schema; private final Path path; private final List partitionKeys; private final String defaultPartName; @@ -122,7 +122,7 @@ public class HoodieTableSource implements private List> requiredPartitions; public HoodieTableSource( - TableSchema schema, + ResolvedSchema schema, Path path, List partitionKeys, String defaultPartName, @@ -131,7 +131,7 @@ public HoodieTableSource( } public HoodieTableSource( - TableSchema schema, + ResolvedSchema schema, Path path, List partitionKeys, String defaultPartName, @@ -147,7 +147,7 @@ public HoodieTableSource( this.conf = conf; this.requiredPartitions = requiredPartitions; this.requiredPos = requiredPos == null - ? IntStream.range(0, schema.getFieldCount()).toArray() + ? IntStream.range(0, schema.getColumnCount()).toArray() : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; this.filters = filters == null ? Collections.emptyList() : filters; @@ -250,8 +250,8 @@ public void applyLimit(long limit) { } private DataType getProducedDataType() { - String[] schemaFieldNames = this.schema.getFieldNames(); - DataType[] schemaTypes = this.schema.getFieldDataTypes(); + String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]); + DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]); return DataTypes.ROW(Arrays.stream(this.requiredPos) .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i])) @@ -383,8 +383,8 @@ private List buildFileIndex(Path[] paths) { } FileInputFormat format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), - this.schema.getFieldNames(), - this.schema.getFieldDataTypes(), + this.schema.getColumnNames().toArray(new String[0]), + this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value @@ -399,8 +399,8 @@ private List buildFileIndex(Path[] paths) { case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED: FileInputFormat format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), - this.schema.getFieldNames(), - this.schema.getFieldDataTypes(), + this.schema.getColumnNames().toArray(new String[0]), + this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, "default", this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index ade43c571d29c..6aea1533eecf1 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -57,8 +57,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index d1475bb3d8590..6f87ef73cee4f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -110,7 +110,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E this.gateway = new MockOperatorEventGateway(); this.conf = conf; // one function - this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1, false); + this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 4abc79acd20c3..8096d5e337e7b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -69,10 +69,10 @@ public class TestStreamReadOperator { private static final Map EXPECTED = new HashMap<>(); static { - EXPECTED.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1"); - EXPECTED.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, id4,Fabian,31,1970-01-01T00:00:00.004,par2"); - EXPECTED.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, id6,Emma,20,1970-01-01T00:00:00.006,par3"); - EXPECTED.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, id8,Han,56,1970-01-01T00:00:00.008,par4"); + EXPECTED.put("par1", "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]"); + EXPECTED.put("par2", "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2]"); + EXPECTED.put("par3", "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3]"); + EXPECTED.put("par4", "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]"); } private Configuration conf; diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index f8dc01825b48d..0764f5586ea80 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -68,8 +68,6 @@ /** * IT cases for Hoodie table source and sink. - *

- * Note: should add more SQL cases when batch write is supported. */ public class HoodieDataSourceITCase extends AbstractTestBase { private TableEnvironment streamTableEnv; @@ -289,7 +287,7 @@ void testStreamReadWithDeletes() throws Exception { + ")"; List result = execSelectSql(streamTableEnv, "select name, sum(age) from t1 group by name", sinkDDL, 10); - final String expected = "[+I(Danny,24), +I(Stephen,34)]"; + final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]"; assertRowsEquals(result, expected, true); } @@ -314,9 +312,9 @@ void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { List result2 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect()); assertRowsEquals(result2, "[" - + "id6,Emma,20,1970-01-01T00:00:06,par3, " - + "id7,Bob,44,1970-01-01T00:00:07,par4, " - + "id8,Han,56,1970-01-01T00:00:08,par4]"); + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } @ParameterizedTest @@ -350,14 +348,14 @@ void testWriteAndReadParMiddle(ExecMode execMode) throws Exception { execInsertSql(streamTableEnv, insertInto); final String expected = "[" - + "id1,Danny,23,par1,1970-01-01T00:00:01, " - + "id2,Stephen,33,par1,1970-01-01T00:00:02, " - + "id3,Julian,53,par2,1970-01-01T00:00:03, " - + "id4,Fabian,31,par2,1970-01-01T00:00:04, " - + "id5,Sophia,18,par3,1970-01-01T00:00:05, " - + "id6,Emma,20,par3,1970-01-01T00:00:06, " - + "id7,Bob,44,par4,1970-01-01T00:00:07, " - + "id8,Han,56,par4,1970-01-01T00:00:08]"; + + "+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], " + + "+I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], " + + "+I[id3, Julian, 53, par2, 1970-01-01T00:00:03], " + + "+I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], " + + "+I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], " + + "+I[id6, Emma, 20, par3, 1970-01-01T00:00:06], " + + "+I[id7, Bob, 44, par4, 1970-01-01T00:00:07], " + + "+I[id8, Han, 56, par4, 1970-01-01T00:00:08]]"; List result = execSelectSql(streamTableEnv, "select * from t1", execMode); @@ -401,8 +399,8 @@ void testInsertOverwrite(ExecMode execMode) { List result2 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); final String expected = "[" - + "id1,Danny,24,1970-01-01T00:00:01,par1, " - + "id2,Stephen,34,1970-01-01T00:00:02,par2]"; + + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]"; assertRowsEquals(result2, expected); } @@ -431,7 +429,7 @@ void testUpsertWithMiniBatches(ExecMode execMode) { List result = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]"); + assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]"); } @ParameterizedTest @@ -467,7 +465,7 @@ void testWriteNonPartitionedTable(ExecMode execMode) { List result = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]"); + assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]"); } @Test @@ -490,7 +488,7 @@ void testWriteGlobalIndex() { List result = CollectionUtil.iterableToList( () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(result, "[id1,Phoebe,52,1970-01-01T00:00:08,par4]"); + assertRowsEquals(result, "[+I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]"); } @Test @@ -514,10 +512,10 @@ void testWriteLocalIndex() { List result = CollectionUtil.iterableToList( () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); final String expected = "[" - + "id1,Stephen,34,1970-01-01T00:00:02,par1, " - + "id1,Fabian,32,1970-01-01T00:00:04,par2, " - + "id1,Jane,19,1970-01-01T00:00:06,par3, " - + "id1,Phoebe,52,1970-01-01T00:00:08,par4]"; + + "+I[id1, Stephen, 34, 1970-01-01T00:00:02, par1], " + + "+I[id1, Fabian, 32, 1970-01-01T00:00:04, par2], " + + "+I[id1, Jane, 19, 1970-01-01T00:00:06, par3], " + + "+I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]"; assertRowsEquals(result, expected, 3); } @@ -577,16 +575,16 @@ void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception { execInsertSql(streamTableEnv, insertInto); final String expected = "[" - + "101,1000,scooter,3.140000104904175, " - + "102,2000,car battery,8.100000381469727, " - + "103,3000,12-pack drill bits,0.800000011920929, " - + "104,4000,hammer,0.75, " - + "105,5000,hammer,0.875, " - + "106,10000,hammer,1.0, " - + "107,11000,rocks,5.099999904632568, " - + "108,8000,jacket,0.10000000149011612, " - + "109,9000,spare tire,22.200000762939453, " - + "110,14000,jacket,0.5]"; + + "+I[101, 1000, scooter, 3.140000104904175], " + + "+I[102, 2000, car battery, 8.100000381469727], " + + "+I[103, 3000, 12-pack drill bits, 0.800000011920929], " + + "+I[104, 4000, hammer, 0.75], " + + "+I[105, 5000, hammer, 0.875], " + + "+I[106, 10000, hammer, 1.0], " + + "+I[107, 11000, rocks, 5.099999904632568], " + + "+I[108, 8000, jacket, 0.10000000149011612], " + + "+I[109, 9000, spare tire, 22.200000762939453], " + + "+I[110, 14000, jacket, 0.5]]"; List result = execSelectSql(streamTableEnv, "select * from hoodie_sink", execMode); @@ -621,9 +619,9 @@ void testBulkInsert(boolean hiveStylePartitioning) { List result2 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect()); assertRowsEquals(result2, "[" - + "id6,Emma,20,1970-01-01T00:00:06,par3, " - + "id7,Bob,44,1970-01-01T00:00:07,par4, " - + "id8,Han,56,1970-01-01T00:00:08,par4]"); + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } @Test @@ -660,11 +658,11 @@ void testBulkInsertNonPartitionedTable() { List result = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); assertRowsEquals(result, "[" - + "id1,Danny,23,1970-01-01T00:00:01,par1, " - + "id1,Stephen,33,1970-01-01T00:00:02,par2, " - + "id1,Julian,53,1970-01-01T00:00:03,par3, " - + "id1,Fabian,31,1970-01-01T00:00:04,par4, " - + "id1,Sophia,18,1970-01-01T00:00:05,par5]", 3); + + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], " + + "+I[id1, Stephen, 33, 1970-01-01T00:00:02, par2], " + + "+I[id1, Julian, 53, 1970-01-01T00:00:03, par3], " + + "+I[id1, Fabian, 31, 1970-01-01T00:00:04, par4], " + + "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3); } // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 799739cfc497a..0439c4d08ad79 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -24,16 +24,18 @@ import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.SchemaBuilder; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.factories.DynamicTableFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -82,7 +84,7 @@ void beforeEach() throws IOException { @Test void testRequiredOptionsForSource() { // miss pk and pre combine key will throw exception - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -92,7 +94,7 @@ void testRequiredOptionsForSource() { assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1)); // given the pk and miss the pre combine key will throw exception - TableSchema schema2 = TableSchema.builder() + ResolvedSchema schema2 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -103,7 +105,7 @@ void testRequiredOptionsForSource() { assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2)); // given pk and pre combine key will be ok - TableSchema schema3 = TableSchema.builder() + ResolvedSchema schema3 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -137,7 +139,7 @@ void testSetupHoodieKeyOptionsForSource() { this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField"); this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass"); // definition with simple primary key and partition path - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -152,7 +154,7 @@ void testSetupHoodieKeyOptionsForSource() { // definition with complex primary keys and partition paths this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue()); - TableSchema schema2 = TableSchema.builder() + ResolvedSchema schema2 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20).notNull()) .field("f2", DataTypes.TIMESTAMP(3)) @@ -177,7 +179,7 @@ void testSetupHoodieKeyOptionsForSource() { @Test void testSetupHiveOptionsForSource() { // definition with simple primary key and partition path - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -202,7 +204,7 @@ void testSetupHiveOptionsForSource() { @Test void testSetupCleaningOptionsForSource() { // definition with simple primary key and partition path - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -249,7 +251,7 @@ void testSetupHoodieKeyOptionsForSink() { this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField"); this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass"); // definition with simple primary key and partition path - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -264,7 +266,7 @@ void testSetupHoodieKeyOptionsForSink() { // definition with complex primary keys and partition paths this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue()); - TableSchema schema2 = TableSchema.builder() + ResolvedSchema schema2 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20).notNull()) .field("f2", DataTypes.TIMESTAMP(3)) @@ -289,7 +291,7 @@ void testSetupHoodieKeyOptionsForSink() { @Test void testSetupHiveOptionsForSink() { // definition with simple primary key and partition path - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -314,7 +316,7 @@ void testSetupHiveOptionsForSink() { @Test void testSetupCleaningOptionsForSink() { // definition with simple primary key and partition path - TableSchema schema1 = TableSchema.builder() + ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) @@ -349,10 +351,10 @@ void testSetupCleaningOptionsForSink() { */ private static class MockContext implements DynamicTableFactory.Context { private final Configuration conf; - private final TableSchema schema; + private final ResolvedSchema schema; private final List partitions; - private MockContext(Configuration conf, TableSchema schema, List partitions) { + private MockContext(Configuration conf, ResolvedSchema schema, List partitions) { this.conf = conf; this.schema = schema; this.partitions = partitions; @@ -362,11 +364,11 @@ static MockContext getInstance(Configuration conf) { return getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition")); } - static MockContext getInstance(Configuration conf, TableSchema schema, String partition) { + static MockContext getInstance(Configuration conf, ResolvedSchema schema, String partition) { return getInstance(conf, schema, Collections.singletonList(partition)); } - static MockContext getInstance(Configuration conf, TableSchema schema, List partitions) { + static MockContext getInstance(Configuration conf, ResolvedSchema schema, List partitions) { return new MockContext(conf, schema, partitions); } @@ -376,8 +378,10 @@ public ObjectIdentifier getObjectIdentifier() { } @Override - public CatalogTable getCatalogTable() { - return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock source table"); + public ResolvedCatalogTable getCatalogTable() { + CatalogTable catalogTable = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(schema).build(), + "mock source table", partitions, conf.toMap()); + return new ResolvedCatalogTable(catalogTable, schema); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 54848bdeeb8bd..cc0699fc521d6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -104,17 +104,18 @@ void testRead(HoodieTableType tableType) throws Exception { result = readData(inputFormat); actual = TestData.rowDataToString(result); - expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, " - + "id10,Ella,38,1970-01-01T00:00:00.007,par4, " - + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, " - + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " - + "id3,Julian,54,1970-01-01T00:00:00.003,par2, " - + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, " - + "id5,Sophia,18,1970-01-01T00:00:00.005,par3, " - + "id6,Emma,20,1970-01-01T00:00:00.006,par3, " - + "id7,Bob,44,1970-01-01T00:00:00.007,par4, " - + "id8,Han,56,1970-01-01T00:00:00.008,par4, " - + "id9,Jane,19,1970-01-01T00:00:00.006,par3]"; + expected = "[" + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "+I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], " + + "+I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], " + + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], " + + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], " + + "+I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], " + + "+I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], " + + "+I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4]]"; assertThat(actual, is(expected)); } @@ -150,21 +151,22 @@ void testReadBaseAndLogFiles() throws Exception { result = readData(inputFormat); actual = TestData.rowDataToString(result); - expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, " - + "id10,Ella,38,1970-01-01T00:00:00.007,par4, " - + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, " - + "id12,Monica,27,1970-01-01T00:00:00.009,par5, " - + "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, " - + "id14,Rachel,52,1970-01-01T00:00:00.011,par6, " - + "id15,Ross,29,1970-01-01T00:00:00.012,par6, " - + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " - + "id3,Julian,54,1970-01-01T00:00:00.003,par2, " - + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, " - + "id5,Sophia,18,1970-01-01T00:00:00.005,par3, " - + "id6,Emma,20,1970-01-01T00:00:00.006,par3, " - + "id7,Bob,44,1970-01-01T00:00:00.007,par4, " - + "id8,Han,56,1970-01-01T00:00:00.008,par4, " - + "id9,Jane,19,1970-01-01T00:00:00.006,par3]"; + expected = "[" + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "+I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], " + + "+I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], " + + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], " + + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], " + + "+I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], " + + "+I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], " + + "+I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4], " + + "+I[id12, Monica, 27, 1970-01-01T00:00:00.009, par5], " + + "+I[id13, Phoebe, 31, 1970-01-01T00:00:00.010, par5], " + + "+I[id14, Rachel, 52, 1970-01-01T00:00:00.011, par6], " + + "+I[id15, Ross, 29, 1970-01-01T00:00:00.012, par6]]"; assertThat(actual, is(expected)); } @@ -189,14 +191,14 @@ void testReadBaseAndLogFilesWithDeletes() throws Exception { // when isEmitDelete is false. List result1 = readData(inputFormat); - final String actual1 = TestData.rowDataToString(result1, true); + final String actual1 = TestData.rowDataToString(result1); final String expected1 = "[" - + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " - + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), " - + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), " - + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), " - + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), " - + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4)]"; + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], " + + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"; assertThat(actual1, is(expected1)); // refresh the input format and set isEmitDelete to true. @@ -206,17 +208,17 @@ void testReadBaseAndLogFilesWithDeletes() throws Exception { List result2 = readData(inputFormat); - final String actual2 = TestData.rowDataToString(result2, true); + final String actual2 = TestData.rowDataToString(result2); final String expected2 = "[" - + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " - + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), " - + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), " - + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), " - + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), " - + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), " - + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), " - + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4), " - + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]"; + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "-D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], " + + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], " + + "-D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], " + + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], " + + "-D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"; assertThat(actual2, is(expected2)); } @@ -241,8 +243,8 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { // when isEmitDelete is false. List result1 = readData(inputFormat); - final String actual1 = TestData.rowDataToString(result1, true); - final String expected1 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]"; + final String actual1 = TestData.rowDataToString(result1); + final String expected1 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"; assertThat(actual1, is(expected1)); // refresh the input format and set isEmitDelete to true. @@ -252,8 +254,8 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { List result2 = readData(inputFormat); - final String actual2 = TestData.rowDataToString(result2, true); - final String expected2 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]"; + final String actual2 = TestData.rowDataToString(result2); + final String expected2 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"; assertThat(actual2, is(expected2)); } @@ -272,13 +274,13 @@ void testReadWithDeletesMOR() throws Exception { List result = readData(inputFormat); - final String actual = TestData.rowDataToString(result, true); + final String actual = TestData.rowDataToString(result); final String expected = "[" - + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " - + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), " - + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), " - + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), " - + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]"; + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "-D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], " + + "-D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], " + + "-D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"; assertThat(actual, is(expected)); } @@ -294,10 +296,10 @@ void testReadWithDeletesCOW() throws Exception { List result = readData(inputFormat); - final String actual = TestData.rowDataToString(result, true); + final String actual = TestData.rowDataToString(result); final String expected = "[" - + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " - + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1)]"; + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]"; assertThat(actual, is(expected)); } @@ -317,7 +319,9 @@ void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { List result = readData(inputFormat); String actual = TestData.rowDataToString(result); - String expected = "[id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1]"; + String expected = "[" + + "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]]"; assertThat(actual, is(expected)); } @@ -335,16 +339,16 @@ void testReadChangesUnMergedMOR() throws Exception { List result = readData(inputFormat); - final String actual = TestData.rowDataToString(result, true); + final String actual = TestData.rowDataToString(result); final String expected = "[" - + "+I(id1,Danny,19,1970-01-01T00:00:00.001,par1), " - + "-U(id1,Danny,19,1970-01-01T00:00:00.001,par1), " - + "+U(id1,Danny,20,1970-01-01T00:00:00.002,par1), " - + "-U(id1,Danny,20,1970-01-01T00:00:00.002,par1), " - + "+U(id1,Danny,21,1970-01-01T00:00:00.003,par1), " - + "-U(id1,Danny,21,1970-01-01T00:00:00.003,par1), " - + "+U(id1,Danny,22,1970-01-01T00:00:00.004,par1), " - + "-D(id1,Danny,22,1970-01-01T00:00:00.005,par1)]"; + + "+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], " + + "-U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], " + + "+U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], " + + "-U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], " + + "+U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], " + + "-U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], " + + "+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], " + + "-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"; assertThat(actual, is(expected)); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/SchemaBuilder.java b/hudi-flink/src/test/java/org/apache/hudi/utils/SchemaBuilder.java new file mode 100644 index 0000000000000..39dd6d659f7b7 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/SchemaBuilder.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Builder for {@link ResolvedSchema}. + */ +public class SchemaBuilder { + private List columns; + private List watermarkSpecs; + private UniqueConstraint constraint; + + public static SchemaBuilder instance() { + return new SchemaBuilder(); + } + + private SchemaBuilder() { + this.columns = new ArrayList<>(); + this.watermarkSpecs = new ArrayList<>(); + } + + public SchemaBuilder field(String name, DataType type) { + this.columns.add(Column.physical(name, type)); + return this; + } + + public SchemaBuilder fields(List names, List types) { + List columns = IntStream.range(0, names.size()) + .mapToObj(idx -> Column.physical(names.get(idx), types.get(idx))) + .collect(Collectors.toList()); + this.columns.addAll(columns); + return this; + } + + public SchemaBuilder primaryKey(String... columns) { + this.constraint = UniqueConstraint.primaryKey("pk", Arrays.asList(columns)); + return this; + } + + public ResolvedSchema build() { + return new ResolvedSchema(columns, watermarkSpecs, constraint); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 2be2acabde96b..a7e38c0d97897 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -50,10 +51,8 @@ private TestConfigurations() { public static final RowType ROW_TYPE = (RowType) ROW_DATA_TYPE.getLogicalType(); - public static final TableSchema TABLE_SCHEMA = TableSchema.builder() - .fields( - ROW_TYPE.getFieldNames().toArray(new String[0]), - ROW_DATA_TYPE.getChildren().toArray(new DataType[0])) + public static final ResolvedSchema TABLE_SCHEMA = SchemaBuilder.instance() + .fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren()) .build(); public static String getCreateHoodieTableDDL(String tableName, Map options) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index b51631c687c05..b6b738a952053 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -73,7 +73,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -/** Data set for testing, also some utilities to check the results. */ +/** + * Data set for testing, also some utilities to check the results. + */ public class TestData { public static List DATA_SET_INSERT = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -128,6 +130,7 @@ public class TestData { ); public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>(); + static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -135,6 +138,7 @@ public class TestData { } public static List DATA_SET_INSERT_SAME_KEY = new ArrayList<>(); + static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -280,39 +284,34 @@ public class TestData { TimestampData.fromEpochMillis(2), StringData.fromString("par1")) ); - /** - * Returns string format of a list of RowData. - */ - public static String rowDataToString(List rows) { - return rowDataToString(rows, false); + private static Integer toIdSafely(Object id) { + if (id == null) { + return -1; + } + final String idStr = id.toString(); + if (idStr.startsWith("id")) { + return Integer.parseInt(idStr.substring(2)); + } + return -1; } /** * Returns string format of a list of RowData. - * - * @param withChangeFlag whether to print the change flag */ - public static String rowDataToString(List rows, boolean withChangeFlag) { + public static String rowDataToString(List rows) { DataStructureConverter converter = DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE); return rows.stream() - .sorted(Comparator.comparing(o -> toStringSafely(o.getString(0)))) - .map(row -> { - final String rowStr = converter.toExternal(row).toString(); - if (withChangeFlag) { - return row.getRowKind().shortString() + "(" + rowStr + ")"; - } else { - return rowStr; - } - }) + .sorted(Comparator.comparing(o -> toIdSafely(o.getString(0)))) + .map(row -> converter.toExternal(row).toString()) .collect(Collectors.toList()).toString(); } /** * Write a list of row data with Hoodie format base on the given configuration. * - * @param dataBuffer The data buffer to write - * @param conf The flink configuration + * @param dataBuffer The data buffer to write + * @param conf The flink configuration * @throws Exception if error occurs */ public static void writeData( @@ -379,8 +378,8 @@ public static void assertRowsEquals(List rows, String expected, boolean wit * Sort the {@code rows} using field at index {@code orderingPos} and asserts * it equals with the expected string {@code expected}. * - * @param rows Actual result rows - * @param expected Expected string of the sorted rows + * @param rows Actual result rows + * @param expected Expected string of the sorted rows * @param orderingPos Field position for ordering */ public static void assertRowsEquals(List rows, String expected, int orderingPos) { @@ -399,9 +398,9 @@ public static void assertRowsEquals(List rows, String expected, int orderin */ public static void assertRowsEquals(List rows, List expected) { String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) + .sorted(Comparator.comparing(o -> toIdSafely(o.getField(0)))) .collect(Collectors.toList()).toString(); - assertThat(rowsString, is(rowDataToString(expected))); + assertThat(rowDataToString(expected), is(rowsString)); } /** @@ -425,7 +424,7 @@ public static void assertRowDataEquals(List rows, String expected) { */ public static void assertRowDataEquals(List rows, List expected) { String rowsString = rowDataToString(rows); - assertThat(rowsString, is(rowDataToString(expected))); + assertThat(rowDataToString(expected), is(rowsString)); } /** @@ -526,8 +525,8 @@ public static void checkWrittenAllData( * *

Note: Replace it with the Flink reader when it is supported. * - * @param basePath The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path + * @param basePath The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path */ public static void checkWrittenFullData( File basePath, @@ -571,12 +570,12 @@ public static void checkWrittenFullData( * *

Note: Replace it with the Flink reader when it is supported. * - * @param fs The file system + * @param fs The file system * @param latestInstant The latest committed instant of current table - * @param baseFile The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path - * @param partitions The expected partition number - * @param schema The read schema + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + * @param schema The read schema */ public static void checkWrittenDataMOR( FileSystem fs, diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index 19b23f54f999a..a44061076f581 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -20,8 +20,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/pom.xml b/pom.xml index d84351098ae97..0fd756ee4a54e 100644 --- a/pom.xml +++ b/pom.xml @@ -110,7 +110,7 @@ 4.4.1 ${spark2.version} ${spark2bundle.version} - 1.12.2 + 1.13.1 2.4.4 3.0.0