Skip to content

Commit

Permalink
[FLINK-34132][runtime] Correct the error message and doc that Adaptiv…
Browse files Browse the repository at this point in the history
…eBatch only supports all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE.

This closes #24118.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed Feb 1, 2024
1 parent 2be1ea8 commit dd3e60a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ Adaptive Batch Scheduler 将会在调度 Source 节点之前调用上述接口

### 局限性
- **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。
- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。请注意,使用 DataSet API 的作业无法识别上述 shuffle 模式,需要将 ExecutionMode 设置为 BATCH_FORCED 才能强制启用 BLOCKING shuffle。
- **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
- **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。

Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ Note that the dynamic source parallelism inference only decides the parallelism
### Limitations

- **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`.
- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`. Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, the ExecutionMode needs to be BATCH_FORCED to force BLOCKING shuffle.
- **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using the Adaptive Batch Scheduler.
- **Inconsistent broadcast results metrics on WebUI**: When use Adaptive Batch Scheduler to automatically decide parallelisms for operators, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -326,11 +327,14 @@ private void checkAllExchangesAreSupported(final JobGraph jobGraph) {
String.format(
"At the moment, adaptive batch scheduler requires batch workloads "
+ "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. "
+ "To do that, you need to configure '%s' to '%s' or '%s/%s'.",
+ "To do that, you need to configure '%s' to '%s' or '%s/%s'. "
+ "Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, "
+ "the ExecutionMode needs to be %s to force BLOCKING shuffle",
ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE));
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE,
ExecutionMode.BATCH_FORCED));
}
}
}
Expand Down

0 comments on commit dd3e60a

Please sign in to comment.