Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,20 +376,20 @@ public String getInFlightInstant() {
public static class Provider implements OperatorCoordinator.Provider {
private final OperatorID operatorId;
private final Configuration conf;
private final int numTasks;

public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
public Provider(OperatorID operatorId, Configuration conf) {
this.operatorId = operatorId;
this.conf = conf;
this.numTasks = numTasks;
}

@Override
public OperatorID getOperatorId() {
return this.operatorId;
}

@Override
public OperatorCoordinator create(Context context) {
return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ public class StreamWriteOperatorFactory<I>

private final StreamWriteOperator<I> operator;
private final Configuration conf;
private final int numTasks;

public StreamWriteOperatorFactory(
Configuration conf,
int numTasks) {
Configuration conf) {
super(new StreamWriteOperator<>(conf));
this.operator = (StreamWriteOperator<I>) getOperator();
this.conf = conf;
this.numTasks = numTasks;
}

@Override
Expand All @@ -65,7 +62,7 @@ public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorP

@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks);
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf, numWriteTask);
new StreamWriteOperatorFactory<>(conf);

DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testWriteToHoodie() throws Exception {
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf, 4);
new StreamWriteOperatorFactory<>(conf);

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
Expand Down
4 changes: 2 additions & 2 deletions hudi-flink/src/test/resources/log4j-surefire.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the log level in this PR?

Copy link
Member Author

@lamberken lamberken Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to change the log level. The StreamWriteITCase#testWriteToHoodie only throw following exception stacktrace if we keep WARN level.

org.opentest4j.AssertionFailedError: 
Expected :true
Actual   :false
<Click to see difference>
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
	at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162)
	at org.apache.hudi.operator.utils.TestData.lambda$checkWrittenFullData$5(TestData.java:198)

While in flink project, ArrayIndexOutOfBoundsException is INFO level.

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L238

7919 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Trying to recover from a global failure.
java.lang.ArrayIndexOutOfBoundsException: 3
	at org.apache.hudi.operator.StreamWriteOperatorCoordinator.handleEventFromOperator(StreamWriteOperatorCoordinator.java:181)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:191)
	at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:952)
	at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:473)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, it's better to change the log level in unit test cases from my side, INFO level can let project output more helpful logs.

Copy link
Contributor

@yanghua yanghua Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine, sounds reasonable. I just want to know the reason that we change it.

log4j.rootLogger=INFO, CONSOLE
log4j.logger.org.apache=INFO
log4j.logger.org.apache.hudi=DEBUG
log4j.logger.org.apache.hadoop.hbase=ERROR
Expand All @@ -27,5 +27,5 @@ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMin=INFO
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL