Skip to content

Conversation

@rmetzger
Copy link
Contributor

This quickfix should probably only go into the 0.7 branch for the 0.7.1 release since the new Akka-based JM/TMs have this issue fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly vote to remove that. There is no need to kill a TaskManager just because it lost its heartbeat. The TaskManager may very well reconnect later and be available again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I Agree. I'll implement it using a separate method just for YARN.

@StephanEwen
Copy link
Contributor

I am not sure that the stop() function should go into the TaskManager interface. Until now, the TaskManager was not assumed to run in its own proccess.

I think putting this into the TaskManagerRunner would make more sense.

@rmetzger rmetzger force-pushed the flink1154 branch 2 times, most recently from 9a78e78 to 58788a8 Compare November 20, 2014 17:24
@rmetzger
Copy link
Contributor Author

I've updated the pull request so that the TM killing is happening in a separate call hierarchy.

I also added code to the YARN Client for making it work with the Google Storage file system wrapper. Only the Flink YARN client works with GCloud storage, not the runtime (See FLINK-1266).

@rmetzger rmetzger closed this Nov 21, 2014
zhijiangW pushed a commit to zhijiangW/flink that referenced this pull request Jul 23, 2019
tharvey5 pushed a commit to tharvey5/flink that referenced this pull request Jun 25, 2025
…ng support (apache#208)

Currently, Apache Flink's does not support storage partition join, which
can lead to unnecessary data shuffles in batch mode. This PR implements
a basic version to support that.

This pull request introduces a new optimizer configuration option
`table.optimizer.storage-partition-join-enabled` and query planer
changes to detect when both sides of a join are partitioned by the join
keys and compatible, allowing it to apply a storage partition join
strategy. This avoids unnecessary shuffles by leveraging the source's
partitioning.

Key changes include:
- Addition of the `SupportsPartitioning` interface for table sources to
expose partitioning information.
- Implementation of `KeyGroupedPartitioning` to represent partitioning
schemes.
- Integration of partitioning awareness in the batch physical sort-merge
join rule to conditionally use the storage partition join when enabled
and applicable.
- [for Testing] Serialization and deserialization utilities for
partitioning metadata.
- [for Testing] Extension of the test values table factory to support
partitioning.
- Comprehensive unit and integration tests verifying the new join
strategy and its configuration.

This enhancement is currently applicable only in batch mode and requires
the source tables to be partitioned by the join keys.

- Added test util for serialization and deserialization of partitioning
metadata, so we can create a test table with a KeyGroupPartition.
- Added integration tests (`TestStoragePartitionJoin`) that verify the
optimizer plan changes when the storage partition join is enabled or
disabled.
- Verified that existing tests pass and that the new join strategy is
correctly applied only when the configuration is enabled and
partitioning is compatible.
- Manual verification of execution plans to confirm the absence of
unnecessary shuffles when storage partition join is enabled.
- verified unit test in table-planner module result is the same as
before the change:
```
[ERROR] Tests run: 8671, Failures: 4, Errors: 0, Skipped: 1
```
It was tested that before the change test failures is also 4:

---------

Co-authored-by: Jeyhun Karimov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants