diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index bf0cfc27e91e5..ce3a24e81207d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -376,20 +376,20 @@ public String getInFlightInstant() { public static class Provider implements OperatorCoordinator.Provider { private final OperatorID operatorId; private final Configuration conf; - private final int numTasks; - public Provider(OperatorID operatorId, Configuration conf, int numTasks) { + public Provider(OperatorID operatorId, Configuration conf) { this.operatorId = operatorId; this.conf = conf; - this.numTasks = numTasks; } + @Override public OperatorID getOperatorId() { return this.operatorId; } + @Override public OperatorCoordinator create(Context context) { - return new StreamWriteOperatorCoordinator(this.conf, this.numTasks); + return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism()); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java index 56267451fba09..a558ffdccdda4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java @@ -39,15 +39,12 @@ public class StreamWriteOperatorFactory private final StreamWriteOperator operator; private final Configuration conf; - private final int numTasks; public StreamWriteOperatorFactory( - Configuration conf, - int numTasks) { + Configuration conf) { super(new StreamWriteOperator<>(conf)); this.operator = (StreamWriteOperator) getOperator(); this.conf = conf; - this.numTasks = numTasks; } @Override @@ -65,7 +62,7 @@ public > T createStreamOperator(StreamOperatorP @Override public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { - return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks); + return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index 24b899496547e..27fd4f5d0e19f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception { Configuration conf = FlinkOptions.fromStreamerConfig(cfg); int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf, numWriteTask); + new StreamWriteOperatorFactory<>(conf); DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java index be8ec36cbe503..42973862d883a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -93,7 +93,7 @@ public void testWriteToHoodie() throws Exception { (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf, 4); + new StreamWriteOperatorFactory<>(conf); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, diff --git a/hudi-flink/src/test/resources/log4j-surefire.properties b/hudi-flink/src/test/resources/log4j-surefire.properties index 32af462093ae5..8dcd17f303f6b 100644 --- a/hudi-flink/src/test/resources/log4j-surefire.properties +++ b/hudi-flink/src/test/resources/log4j-surefire.properties @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ### -log4j.rootLogger=WARN, CONSOLE +log4j.rootLogger=INFO, CONSOLE log4j.logger.org.apache=INFO log4j.logger.org.apache.hudi=DEBUG log4j.logger.org.apache.hadoop.hbase=ERROR @@ -27,5 +27,5 @@ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true -log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMin=INFO log4j.appender.CONSOLE.filter.a.LevelMax=FATAL