Skip to content
Closed
Changes from 3 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c11e0e2
#1
Apr 19, 2022
cbb0312
#1
Apr 19, 2022
20b918e
#improve the flink sink operator name for better identify tables to w…
May 23, 2022
42c7129
[HUDI-4142] Claim RFC-54 for new table APIs (#5665)
codope May 23, 2022
752f956
[HUDI-3933] Add UT cases to cover different key gen (#5638)
yihua May 23, 2022
716e995
[MINOR] Removing redundant semicolons and line breaks (#5662)
felixYyu May 23, 2022
47b764e
[HUDI-4134] Fix Method naming consistency issues in FSUtils (#5655)
h1ap May 23, 2022
af1128a
[HUDI-4084] Add support to test async table services with integ test …
nsivabalan May 24, 2022
676d5ce
[HUDI-4138] Fix the concurrency modification of hoodie table config f…
danny0405 May 24, 2022
c05ebf2
[HUDI-2473] Fixing compaction write operation in commit metadata (#5203)
nsivabalan May 24, 2022
eb21901
[HUDI-4145] Archives the metadata file in HoodieInstant.State sequenc…
danny0405 May 24, 2022
0caa55e
[HUDI-4135] remove netty and netty-all (#5663)
liujinhui1994 May 24, 2022
c20db99
[HUDI-2207] Support independent flink hudi clustering function
yuzhaojing May 21, 2022
10363c1
[HUDI-4132] Fixing determining target table schema for delta sync wit…
nsivabalan May 24, 2022
18635b5
Merge pull request #3599 from yuzhaojing/HUDI-2207
yuzhaojing May 24, 2022
f30b3ae
[MINOR] Fix a potential NPE and some finer points of hudi cli (#5656)
May 24, 2022
a6bc9e8
[HUDI-4146] Claim RFC-55 for Improve Hive/Meta sync class design and …
fengjian428 May 25, 2022
cf837b4
[HUDI-3193] Decouple hudi-aws from hudi-client-common (#5666)
codope May 25, 2022
4e42ed5
[HUDI-4145] Archives the metadata file in HoodieInstant.State sequenc…
danny0405 May 26, 2022
98c5c6c
[HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row…
May 26, 2022
31e13db
[HUDI-4023] Decouple hudi-spark from hudi-utilities-slim-bundle (#5641)
codope May 26, 2022
8d2f009
[HUDI-4124] Add valid check in Spark Datasource configs (#5637)
wzx140 May 26, 2022
85962ee
[HUDI-3963][RFC-53] Use Lock-Free Message Queue Disruptor Improving H…
zhangyue19921010 May 26, 2022
57dbe57
[HUDI-4162] Fixed some constant mapping issues. (#5700)
watermelon12138 May 27, 2022
1767ff5
[HUDI-4161] Make sure partition values are taken from partition path …
May 27, 2022
554caa3
[MINOR] Fix the issue when handling conf hoodie.datasource.write.oper…
May 27, 2022
93fe5a4
[HUDI-4151] flink split_reader supports rocksdb (#5675)
cuibo01 May 28, 2022
58014c1
[HUDI-4160] Make database regex of MaxwellJsonKafkaSourcePostProcesso…
wangxianghu May 28, 2022
8fa8f26
[MINOR] Fix Hive and meta sync config for sql statement (#5316)
XuQianJin-Stars May 28, 2022
48062a5
[HUDI-4166] Added SimpleClient plugin for integ test (#5710)
uday08bce May 28, 2022
62d7923
[HUDI-3551] Add the Oracle Cloud Infrastructure (oci) Object Storage …
cartershanklin May 28, 2022
0a72458
[HUDI-3551] Fix testStorageSchemes for oci storage (#5711)
xushiyan May 28, 2022
7e86884
[HUDI-4086] Use CustomizedThreadFactory in async compaction and clust…
scxwhite May 29, 2022
329da34
[HUDI-4163] Catch general exception instead of IOException while fetc…
danny0405 May 30, 2022
918c4f4
[HUDI-4149] Drop-Table fails when underlying table directory is broke…
jinxing64 May 30, 2022
795a99b
[HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTab…
kumudkumartirupati May 31, 2022
0d069b5
[HUDI-4174] Add hive conf dir option for flink sink (#5725)
danny0405 Jun 1, 2022
dfcd6d9
[HUDI-4011] Add hudi-aws-bundle (#5674)
codope Jun 1, 2022
7276d0e
[HUDI-3670] free temp views in sql transformers (#5080)
qjqqyy Jun 1, 2022
7f8630c
[HUDI-4167] Remove the timeline refresh with initializing hoodie tabl…
danny0405 Jun 2, 2022
51602a3
[HUDI-4179] Cluster with sort cloumns invalid (#5739)
KnightChess Jun 2, 2022
69f28b6
#1
Apr 19, 2022
bdde6ae
#1
Apr 19, 2022
4caedfc
#improve the flink sink operator name for better identify tables to w…
May 23, 2022
297f936
#improve the flink sink operator name for better identify tables to w…
Jun 3, 2022
d8043bf
#inporvment for inprovement for flink write opertaor name to identify…
Jun 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return dataStream
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
.transform("bucket_bulk_insert" + ":" + conf.getString(FlinkOptions.TABLE_NAME), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
Expand All @@ -137,7 +137,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
// sort by partition keys
dataStream = dataStream
.transform("partition_key_sorter",
.transform("partition_key_sorter" + ":" + conf.getString(FlinkOptions.TABLE_NAME),
TypeInformation.of(RowData.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe only the write op with table name is enough, for e.g

  • bucket_bulk_insert
  • hoodie_bulk_insert_write
  • hoodie_append_write
  • bucket_write
  • stream_write

And we can extract a common util method here like

writeOpIdentifier(String, Configuration)

for generating operator name with table suffix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right , i will try to amend it with your suggestion

Copy link
Contributor Author

@yanenze yanenze Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have created a new PR in #5744

sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
Expand All @@ -146,7 +146,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
}
}
return dataStream
.transform("hoodie_bulk_insert_write",
.transform("hoodie_bulk_insert_write" + ":" + conf.getString(FlinkOptions.TABLE_NAME),
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
Expand Down Expand Up @@ -190,7 +190,7 @@ public static DataStreamSink<Object> append(
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);

return dataStream
.transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
.transform("hoodie_append_write" + ":" + conf.getString(FlinkOptions.TABLE_NAME), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
Expand Down Expand Up @@ -250,7 +250,7 @@ private static DataStream<HoodieRecord> streamBootstrap(
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
dataStream1 = dataStream1
.transform(
"index_bootstrap",
"index_bootstrap" + ":" + conf.getString(FlinkOptions.TABLE_NAME),
TypeInformation.of(HoodieRecord.class),
new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
Expand All @@ -277,7 +277,7 @@ private static DataStream<HoodieRecord> boundedBootstrap(

return rowDataToHoodieRecord(conf, rowType, dataStream)
.transform(
"batch_index_bootstrap",
"batch_index_bootstrap" + ":" + conf.getString(FlinkOptions.TABLE_NAME),
TypeInformation.of(HoodieRecord.class),
new BatchBootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
Expand Down Expand Up @@ -322,7 +322,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.transform("bucket_write" + ":" + conf.getString(FlinkOptions.TABLE_NAME), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
Expand All @@ -331,7 +331,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
"bucket_assigner" + ":" + conf.getString(FlinkOptions.TABLE_NAME),
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
Expand Down Expand Up @@ -365,7 +365,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
* @return the compaction pipeline
*/
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
return dataStream.transform("compact_plan_generate",
return dataStream.transform("compact_plan_generate" + ":" + conf.getString(FlinkOptions.TABLE_NAME),
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
Expand Down