Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions docs/content.zh/docs/internals/task_lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ under the License.

# Task 生命周期

A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
As an example, an operator with a parallelism of *5* will have each of its instances executed by a separate task.

The `StreamTask` is the base for all different task sub-types in Flink's streaming engine. This document goes through
the different phases in the lifecycle of the `StreamTask` and describes the main methods representing each of these
phases.

A task in Flink is the basic unit of execution. It is the place where each parallel instance of an
operator is executed. As an example, an operator with a parallelism of *5* will have each of its
instances executed by a separate task.

The `StreamTask` is the base for all different task sub-types in Flink's streaming engine. This
document goes through the different phases in the lifecycle of the `StreamTask` and describes the
main methods representing each of these phases.

## Operator Lifecycle in a nutshell

Expand All @@ -50,19 +49,21 @@ the `AbstractUdfStreamOperator`, which is the basic class for all operators that
OPERATOR::initializeState
OPERATOR::open
UDF::open

// processing phase (called on every element/watermark)
OPERATOR::processElement
UDF::run
OPERATOR::processWatermark

// checkpointing phase (called asynchronously on every checkpoint)
OPERATOR::snapshotState


// notify the operator about the end of processing records
OPERATOR::finish

// termination phase
OPERATOR::close
UDF::close
OPERATOR::dispose

In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
Expand All @@ -82,13 +83,14 @@ checkpoint which invokes (asynchronously) the `snapshotState()` method, which we
depending on its type one of the aforementioned methods is called. Note that the `processElement()` is also the place
where the UDF's logic is invoked, *e.g.* the `map()` method of your `MapFunction`.

Finally, in the case of a normal, fault-free termination of the operator (*e.g.* if the stream is finite and its end is
reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic (*e.g.*
close any connections or I/O streams opened during the operator's execution), and the `dispose()` is called after that
to free any resources held by the operator (*e.g.* native memory held by the operator's data).
Finally, in the case of a normal, fault-free termination of the operator (*e.g.* if the stream is
finite and its end is reached), the `finish()` method is called to perform any final bookkeeping
action required by the operator's logic (*e.g.* flush any buffered data, or emit data to mark end of
procesing), and the `close()` is called after that to free any resources held by the operator
(*e.g.* open network connections, io streams, or native memory held by the operator's data).

In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`
and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`.
In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `close()`
and skips any intermediate phases between the phase the operator was in when the failure happened and the `close()`.

**Checkpoints:** The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described
above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, *i.e.* after the
Expand All @@ -111,18 +113,18 @@ either manually, or due some other reason, *e.g.* an exception thrown during exe

The steps a task goes through when executed until completion without being interrupted are illustrated below:

TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
close-operators
dispose-operators
task-specific-cleanup
common-cleanup
TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
finish-operators
close-operators
task-specific-cleanup
common-cleanup

As shown above, after recovering the task configuration and initializing some important runtime parameters, the very
first step for the task is to retrieve its initial, task-wide state. This is done in the `setInitialState()`, and it is
Expand Down Expand Up @@ -166,10 +168,10 @@ methods are called.
In the case of running till completion, *i.e.* there is no more input data to process, after exiting from the `run()`
method, the task enters its shutdown process. Initially, the timer service stops registering any new timers (*e.g.* from
fired timers that are being executed), clears all not-yet-started timers, and awaits the completion of currently
executing timers. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by
calling the `close()` method of each operator. Then, any buffered output data is flushed so that they can be processed
by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the
`dispose()` method of each one. When opening the different operators, we mentioned that the order is from the
executing timers. Then the `finishAllOperators()` notifies the operators involved in the computation by
calling the `finish()` method of each operator. Then, any buffered output data is flushed so that they can be processed
by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the
`close()` method of each one. When opening the different operators, we mentioned that the order is from the
last to the first. Closing happens in the opposite manner, from first to last.

{{< hint info >}}
Expand All @@ -196,7 +198,7 @@ completed.

In the previous sections we described the lifecycle of a task that runs till completion. In case the task is cancelled
at any point, then the normal execution is interrupted and the only operations performed from that point on are the timer
service shutdown, the task-specific cleanup, the disposal of the operators, and the general task cleanup, as described
service shutdown, the task-specific cleanup, the closing of the operators, and the general task cleanup, as described
above.

{{< top >}}
68 changes: 35 additions & 33 deletions docs/content/docs/internals/task_lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ under the License.

# Task Lifecycle

A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
As an example, an operator with a parallelism of *5* will have each of its instances executed by a separate task.

The `StreamTask` is the base for all different task sub-types in Flink's streaming engine. This document goes through
the different phases in the lifecycle of the `StreamTask` and describes the main methods representing each of these
phases.

A task in Flink is the basic unit of execution. It is the place where each parallel instance of an
operator is executed. As an example, an operator with a parallelism of *5* will have each of its
instances executed by a separate task.

The `StreamTask` is the base for all different task sub-types in Flink's streaming engine. This
document goes through the different phases in the lifecycle of the `StreamTask` and describes the
main methods representing each of these phases.

## Operator Lifecycle in a nutshell

Expand All @@ -50,19 +49,21 @@ the `AbstractUdfStreamOperator`, which is the basic class for all operators that
OPERATOR::initializeState
OPERATOR::open
UDF::open

// processing phase (called on every element/watermark)
OPERATOR::processElement
UDF::run
OPERATOR::processWatermark

// checkpointing phase (called asynchronously on every checkpoint)
OPERATOR::snapshotState


// notify the operator about the end of processing records
OPERATOR::finish

// termination phase
OPERATOR::close
UDF::close
OPERATOR::dispose

In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
Expand All @@ -82,13 +83,14 @@ checkpoint which invokes (asynchronously) the `snapshotState()` method, which we
depending on its type one of the aforementioned methods is called. Note that the `processElement()` is also the place
where the UDF's logic is invoked, *e.g.* the `map()` method of your `MapFunction`.

Finally, in the case of a normal, fault-free termination of the operator (*e.g.* if the stream is finite and its end is
reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic (*e.g.*
close any connections or I/O streams opened during the operator's execution), and the `dispose()` is called after that
to free any resources held by the operator (*e.g.* native memory held by the operator's data).
Finally, in the case of a normal, fault-free termination of the operator (*e.g.* if the stream is
finite and its end is reached), the `finish()` method is called to perform any final bookkeeping
action required by the operator's logic (*e.g.* flush any buffered data, or emit data to mark end of
procesing), and the `close()` is called after that to free any resources held by the operator
(*e.g.* open network connections, io streams, or native memory held by the operator's data).

In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`
and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`.
In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `close()`
and skips any intermediate phases between the phase the operator was in when the failure happened and the `close()`.

**Checkpoints:** The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described
above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, *i.e.* after the
Expand All @@ -111,18 +113,18 @@ either manually, or due some other reason, *e.g.* an exception thrown during exe

The steps a task goes through when executed until completion without being interrupted are illustrated below:

TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
close-operators
dispose-operators
task-specific-cleanup
common-cleanup
TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
finish-operators
close-operators
task-specific-cleanup
common-cleanup

As shown above, after recovering the task configuration and initializing some important runtime parameters, the very
first step for the task is to retrieve its initial, task-wide state. This is done in the `setInitialState()`, and it is
Expand Down Expand Up @@ -166,10 +168,10 @@ methods are called.
In the case of running till completion, *i.e.* there is no more input data to process, after exiting from the `run()`
method, the task enters its shutdown process. Initially, the timer service stops registering any new timers (*e.g.* from
fired timers that are being executed), clears all not-yet-started timers, and awaits the completion of currently
executing timers. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by
calling the `close()` method of each operator. Then, any buffered output data is flushed so that they can be processed
by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the
`dispose()` method of each one. When opening the different operators, we mentioned that the order is from the
executing timers. Then the `finishAllOperators()` notifies the operators involved in the computation by
calling the `finish()` method of each operator. Then, any buffered output data is flushed so that they can be processed
by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the
`close()` method of each one. When opening the different operators, we mentioned that the order is from the
last to the first. Closing happens in the opposite manner, from first to last.

{{< hint info >}}
Expand All @@ -196,7 +198,7 @@ completed.

In the previous sections we described the lifecycle of a task that runs till completion. In case the task is cancelled
at any point, then the normal execution is interrupted and the only operations performed from that point on are the timer
service shutdown, the task-specific cleanup, the disposal of the operators, and the general task cleanup, as described
service shutdown, the task-specific cleanup, the closing of the operators, and the general task cleanup, as described
above.

{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
mainOperator.processElement(streamRecord);
} else {
mainOperator.endInput();
mainOperator.finish();
controller.suspendDefaultAction();
mailboxProcessor.suspend();
}
Expand All @@ -117,7 +118,6 @@ protected void cancelTask() {}
@Override
protected void cleanup() throws Exception {
mainOperator.close();
mainOperator.dispose();
}

private static class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ public void open() throws Exception {
}

@Override
public void close() throws Exception {
operator.close();
public void finish() throws Exception {
operator.finish();
}

@Override
public void dispose() throws Exception {
operator.dispose();
public void close() throws Exception {
operator.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ public void open() throws Exception {
}

@Override
public void close() throws Exception {
ACTUAL_ORDER_TRACKING.add("close");
public void finish() throws Exception {
ACTUAL_ORDER_TRACKING.add("finish");
}

@Override
public void dispose() throws Exception {
ACTUAL_ORDER_TRACKING.add("dispose");
public void close() throws Exception {
ACTUAL_ORDER_TRACKING.add("close");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,16 @@ public void open() throws Exception {
}

@Override
public void close() throws Exception {
public void finish() throws Exception {
try {
invokeFinishBundle();
} finally {
super.close();

try {
cleanUpLeakingClasses(this.getClass().getClassLoader());
} catch (Throwable t) {
LOG.warn("Failed to clean up the leaking objects.", t);
}
super.finish();
}
}

@Override
public void dispose() throws Exception {
public void close() throws Exception {
try {
if (checkFinishBundleTimer != null) {
checkFinishBundleTimer.cancel(true);
Expand All @@ -165,7 +159,13 @@ public void dispose() throws Exception {
pythonFunctionRunner = null;
}
} finally {
super.dispose();
super.close();

try {
cleanUpLeakingClasses(this.getClass().getClassLoader());
} catch (Throwable t) {
LOG.warn("Failed to clean up the leaking objects.", t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark
}

@Override
public void close() throws Exception {
super.close();
public void finish() throws Exception {
super.finish();
watermarkGenerator.onPeriodicEmit(watermarkOutput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void open() throws Exception {
}

@Override
public void dispose() throws Exception {
super.dispose();
public void close() throws Exception {
super.close();
if (arrowSerializer != null) {
arrowSerializer.close();
arrowSerializer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public void endInput() throws Exception {
}

@Override
public void close() throws Exception {
public void finish() throws Exception {
invokeCurrentBatch();
super.close();
super.finish();
}

protected abstract void invokeCurrentBatch() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ public void endInput() throws Exception {
}

@Override
public void dispose() throws Exception {
super.dispose();
if (arrowSerializer != null) {
arrowSerializer.close();
arrowSerializer = null;
}
public void finish() throws Exception {
invokeCurrentBatch();
super.finish();
}

@Override
public void close() throws Exception {
invokeCurrentBatch();
super.close();
if (arrowSerializer != null) {
arrowSerializer.close();
arrowSerializer = null;
}
}

@Override
Expand Down
Loading