Skip to content

Flink: fix the deadlock of waiting for extra checkpoints in BoundedTestSource with TestFlinkIcebergSink.testTwoSinksInDisjointedDAG #3112

@stevenzwu

Description

@stevenzwu

@kbendick and @nastra found out that this test caused build to stuck in the infinite loop of waiting for required checkpoint to complete

      synchronized (ctx.getCheckpointLock()) {
        while (running && numCheckpointsComplete.get() < checkpointToAwait) {
          ctx.getCheckpointLock().wait(1);
        }
      }

The problem is that TestFlinkIcebergSink.testTwoSinksInDisjointedDAG has two disjointed pipelines. The extra +2 checkpoints waiting broke in this situation where part of the DAG has finished.

        // Let's say checkpointToAwait = numCheckpointsComplete.get() + delta, in fact the value of delta should not
        // affect the final table records because we only need to make sure that there will be exactly
        // elementsPerCheckpoint.size() checkpoints to emit each records buffer from the original elementsPerCheckpoint.
        // Even if the checkpoints that emitted results are not continuous, the correctness of the data should not be
        // affected in the end. Setting the delta to be 2 is introducing the variable that produce un-continuous
        // checkpoints that emit the records buffer from elementsPerCheckpoints.
        checkpointToAwait = numCheckpointsComplete.get() + 2;

While FLIP-147 can make this test work again, we can add a boolean flag to BoundedTestSource that can disable the +2 behavior. what do you think? @kbendick @openinx

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions