Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
9a6b24b
[FLUSS-2262][lake] Improved Stability for Iceberg Log Table Compactio…
rionmonster Dec 27, 2025
8159dee
[FLUSS-2262][lake][server] Address lake table tiering stability durin…
rionmonster Jan 10, 2026
d901546
[FLUSS-2262][lake][server] Ensure pending tiering tables are removed …
rionmonster Jan 10, 2026
4447287
[lake/iceberg] Removal and clean-up for previous approaches
rionmonster Jan 28, 2026
b7a93c1
[lake/iceberg] Continued clean-up from previous iterations
rionmonster Jan 28, 2026
d980235
[common] switch to at.yawk.lz4:lz4-java due to CVE-2025-12183 (#2136)
sd4324530 Dec 28, 2025
ac06ca2
[lake] Support ADD COLUMN AT LAST for dataLake enabled tables (#2189)
buvb Dec 28, 2025
36e2a6f
[lake/paimon] Support NestedRow types for tiering paimon (#2260)
XuQianJin-Stars Dec 28, 2025
7cc346c
[docs] Use the "docker compose ps" to show the running containers (#2…
zuston Dec 28, 2025
cc18333
[server] Introduce new rebalance API (#1380)
swuferhong Dec 28, 2025
1d3b907
[Docs] Document the TypedAPI #2230 (#2261)
Prajwal-banakar Dec 28, 2025
aad81f3
[flink] Add RuntimeContext adapter for Flink 2.x compatibility (#2241)
sd4324530 Dec 29, 2025
224d5ff
[server] Support AddServerTag and RemoveServerTag (#1400)
swuferhong Dec 29, 2025
d886234
[client] Change SchemaNotExistException as retriable exception. (#2193)
loserwang1024 Dec 29, 2025
26eb8fb
[hotfix] Use CatalogTableAdapter rather than CatalogTable.newBuilder …
loserwang1024 Dec 29, 2025
c1a9bc6
[flink] Flink Source need to check tableId on recovery in case that t…
loserwang1024 Dec 29, 2025
dc90653
[common] Fix IndexOutOfBoundsException in ArrowArrayWriter when eleme…
Dec 12, 2025
2d0d9b0
[common Improve ArrowArrayWriter performance by replacing hardcoded h…
platinumhamburg Dec 31, 2025
1ed2c58
[lake] Record a file path storing log offsets in lake snapshot proper…
luoyuxia Dec 31, 2025
716fbb7
[kv] Implement basic Aggregate Merge Engine (#2255)
platinumhamburg Dec 31, 2025
8115d13
[hotfix]: Build expected results with partitions value, not hard code…
SML0127 Jan 4, 2026
daec805
[docs] update datatypes & paimon datatype mapping document (#2283)
Jackeyzhe Jan 4, 2026
09c1cf4
[spark] support spark batch write (#2277)
YannByron Jan 4, 2026
5b864e4
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats …
XuQianJin-Stars Jan 5, 2026
e1e22e2
[hotfix] Fix compile problem of InternalRow#getMap() interface in Spa…
wuchong Jan 5, 2026
1782c6d
[hotfix] Fix spotless checkstyle problems
wuchong Jan 5, 2026
44a8c86
[lake/iceberg] Support tier array type for iceberg (#2266)
XuQianJin-Stars Jan 5, 2026
4096672
[hotfix] Fix compilation error: implement missing getMap method in Ic…
XuQianJin-Stars Jan 5, 2026
a099126
[kv] Support to report rocksdb metrics (#2282)
platinumhamburg Jan 5, 2026
5965ce8
[docs] Move 'Merge Engine' to the top-level of 'Table Design' (#2300)
Aditya41150 Jan 5, 2026
a3195d6
[spark] Add unit tests for spark SHOW TABLES command (#2304)
Yohahaha Jan 5, 2026
9d44aa3
[docs] Document COMPACTED table format (#2264)
Priyamanjare54 Jan 5, 2026
1b8f7e4
[server] Not permitted to enable datalake for tables created prior to…
LiebingYu Jan 7, 2026
badde92
[lake/paimon] Support map types for tiering paimon (#2308)
XuQianJin-Stars Jan 7, 2026
894daa9
[server] Fix ConcurrentModificationException when access getTable con…
LiebingYu Jan 7, 2026
cf653c8
[server] Servers with PERMANENT_OFFLINE tag are no longer assigned re…
LiebingYu Jan 7, 2026
293664c
[lake] fix potential dirty commit on re-create table (#2316)
zuston Jan 7, 2026
18f7bd1
[hotfix] use ltz_millis as paimon system column for timestamp dataty…
luoyuxia Jan 8, 2026
5a14061
[hotfix] Fix union read can't restore issue (#2321)
luoyuxia Jan 8, 2026
6472010
[server] Support generate and execute reblance plan (#1452)
swuferhong Jan 8, 2026
a36654f
[common] Add field_id for Nested Row. (#2322)
loserwang1024 Jan 8, 2026
85f0966
[flink] Refactor CALL procedure "sys.xxx_cluster_config" to support m…
sd4324530 Jan 8, 2026
e62373f
[hotfix] Fix the rebalance status cannot reach complete when the reba…
swuferhong Jan 8, 2026
7b5caf2
[docs] Add MAP type to data types and Paimon integration documentatio…
XuQianJin-Stars Jan 9, 2026
12ff308
[test] Add BucketingFunctionTest, this allows rust client side to als…
leekeiabstraction Jan 9, 2026
4f93ffa
[lake/iceberg] Support nested row types for Iceberg tiering (#2278)
SML0127 Jan 9, 2026
544cb4e
[hotfix] Add authentication for rebalance and controlled shutdown. (#…
LiebingYu Jan 9, 2026
b0a93fb
[docs]: updated data type mapping in iceberg (#2336)
SML0127 Jan 9, 2026
7600e8d
[NOTICE] Update copyright NOTICE year to 2026 (#2343)
wuchong Jan 11, 2026
e88e97e
[spark] Fix spark desc command output with partition info (#2313)
Yohahaha Jan 11, 2026
ef962be
[kv] include hidden potential internal flush time in KV flush latency…
zuston Jan 11, 2026
adbbd3d
[Task] Enhance tests for cluster rebalancing (#2315) (#2337)
Prajwal-banakar Jan 12, 2026
e4cb73c
[docs] Add alter table feature link in getting-started.md (#2348)
swuferhong Jan 12, 2026
1128946
[hotfix] Add authentication to the admin APIs that currently lack it …
swuferhong Jan 12, 2026
f45eabe
[doc] fix typo of 'table.datalake.enabled' in options.md (#2366)
zhaomin1423 Jan 14, 2026
44a1e16
[lake] Re-initial tiering splits for tables if reader failover happen…
beryllw Jan 15, 2026
6c1a155
[paimon] Add IT case for union read with evolved schema for Paimon (#…
ZuebeyirEser Jan 15, 2026
e3b094d
Rebalance procedure docs (#2355)
swuferhong Jan 15, 2026
6e15f53
[Flink] Support Partial Updates to the Flink Sink (#2042)
polyzos Jan 15, 2026
e3facdb
Change the ListRebalanceProcessProcedure result from String[] to row[…
sd4324530 Jan 16, 2026
d0a54b1
[spark] Add Spark show/add/drop partition support (#2314)
Yohahaha Jan 17, 2026
38ca1dd
[lake/lance] Refactor LanceArrowWriter to reuse fluss-common ArrowWri…
XuQianJin-Stars Jan 17, 2026
6e38036
[kv] Add basic implementation of AUTO_INCREMENT column (#2161)
beryllw Jan 6, 2026
2d86eed
[kv] Improve the implementation of AUTO_INCREMENT column
wuchong Jan 17, 2026
576c612
[server] Prevent remote log TTL expiration until tiering completes (#…
LiebingYu Jan 17, 2026
ed6040c
[spark] Add spark streaming write support (#2357)
Yohahaha Jan 18, 2026
513902c
Add RocksDB block cache configuration options for index and filter bl…
platinumhamburg Jan 18, 2026
9fc28b9
[test] Fix unstable test RebalanceITCase#testRebalanceForLogTable (#2…
swuferhong Jan 18, 2026
3075a41
[flink] Support PARTITION_DYNAMIC sink dynamic shuffle based on parti…
loserwang1024 Jan 15, 2026
d1464fd
[flink] Optimize sink statistics calculation to run only when necessary
wuchong Jan 17, 2026
c8d6f21
[flink] Fix the distribution mode support for FlussSink for DataStrea…
wuchong Jan 17, 2026
ed0f671
[tests] Cleanup ZK state in TableChangeWatcherTest to not let current…
vamossagar12 Jan 19, 2026
53f905a
[hotfix] Fix union read fail when from timestamp and projection enabl…
luoyuxia Jan 19, 2026
860e908
[docs] Add docs for rebalance (#2382)
swuferhong Jan 19, 2026
cff2a43
[lake] Support alter table.datalake.freshness (#2365)
zhaomin1423 Jan 20, 2026
e7309db
[flink] Flink integration with Aggregation Merge Engine (#2307)
platinumhamburg Jan 20, 2026
4a4670e
[kv] Add roaring bitmap aggregate function for aggregation merge engi…
platinumhamburg Jan 20, 2026
f4fdeaf
[client] move OffsetsInitializer to fluss-client (#2424)
YannByron Jan 21, 2026
071c572
[hotfix] Catalog related lake options should overwrite table lake rel…
luoyuxia Jan 21, 2026
f8e31dc
[test] Support manually trigger and wait for KV snapshots for FlussCl…
wuchong Jan 21, 2026
7035e24
[spark] support batch read from fluss cluster (#2377)
YannByron Jan 21, 2026
98e587f
[website] Update Fluss messaging (#2428)
polyzos Jan 22, 2026
540587a
[minor][flink] Remove unnecessary java notes (#2444)
leonardBang Jan 23, 2026
640485c
[docs] Add client feature support matrix documentation (#2445)
leekeiabstraction Jan 23, 2026
b10d912
[lake] report pending records even though no tiering is finished (#2453)
luoyuxia Jan 23, 2026
171875f
[hotfix][helm] Update image repository to use 'apache/fluss' instead …
morazow Jan 23, 2026
58bb4de
[coordinator] Refactor SchemaUpdate to delegate schema changes to Sch…
Prajwal-banakar Jan 23, 2026
fda1227
[flink] changelog read support for pk table without pushdown optimiza…
MehulBatra Jan 23, 2026
1a6e236
[lake] Make FlussLakeTiering pluggable to customize tiering job const…
luoyuxia Jan 24, 2026
eab70bb
[server] Fix FirstRowMergeEngine hang caused by empty log entries in …
loserwang1024 Jan 24, 2026
0909a08
[server] Update tablet server metadata cache when TableRegistration c…
LiebingYu Jan 24, 2026
d383c9f
[test] Fix failed testPkCompactedPollFromLatestNoRecords. (#2465)
SML0127 Jan 24, 2026
9a64fb5
[hotfix] Remove unstable "latest" scan startup mode test in Changelog…
wuchong Jan 25, 2026
5ab4f85
[flink] changelog read support for log table without pushdown optimiz…
MehulBatra Jan 25, 2026
f8eab26
[helm] Remove `tablet-server.id` from coordinator configuration optio…
morazow Jan 25, 2026
8a9a1e4
[test] Fix flaky AdjustIsrITCase#testIsrShrinkAndExpand by using acks…
ZuebeyirEser Jan 25, 2026
c711a5d
[doc/website] virtual tables documentation - $changelog for PK/Log Ta…
MehulBatra Jan 25, 2026
a9829e7
[test] Fix unstable test ReplicaTest.testRestore (#2469)
wuchong Jan 26, 2026
95feb64
[spark] Add Spark SQL ALTER table support for set/remove table proper…
Yohahaha Jan 26, 2026
09da97f
[hotfix] update messaging on github page (#2478)
polyzos Jan 26, 2026
a94c343
[build] Upgrade GitHub Actions for Node 24 compatibility (#2349)
sd4324530 Jan 26, 2026
5fd72ed
[lake] Tiering service support commit by time (#2185)
luoyuxia Jan 27, 2026
3f00b00
[kv] Add Producer Offset Snapshot for Exactly-Once semantics (#2434)
platinumhamburg Jan 27, 2026
10091bd
[hotfix] Fix unstable tiering source enumerator test (#2490)
luoyuxia Jan 27, 2026
2ad450f
[test] Add additional assertion to tests for TimestampLtz/Ntz to part…
leekeiabstraction Jan 27, 2026
90f5892
[fs/azure] Support Azure Blob Storage (#1941)
gkatzioura Jan 27, 2026
5c236d2
[flink] Fix Flink connector failure caused by unknown table options (…
LiebingYu Jan 28, 2026
95f4dd9
[spark] Add sparksql lake table catalog DDL suite for paimon (#2438)
Yohahaha Jan 28, 2026
51f516b
[s3] fix the size of fluss-fs-s3 is too big (#2419)
sd4324530 Jan 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 4 additions & 4 deletions .github/workflows/ci-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ jobs:
name: "${{ matrix.module }}"
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v6
- name: Set up JDK
uses: actions/setup-java@v4
uses: actions/setup-java@v5
with:
java-version: ${{ inputs.java-version }}
distribution: 'temurin'
Expand All @@ -66,13 +66,13 @@ jobs:
ARTIFACTS_OSS_STS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_STS_ENDPOINT }}
ARTIFACTS_OSS_ROLE_ARN: ${{ secrets.ARTIFACTS_OSS_ROLE_ARN }}
- name: Upload build logs
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v6
if: ${{ failure() }}
with:
name: logs-test-${{ matrix.module }}-${{ github.run_number}}#${{ github.run_attempt }}
path: ${{ runner.temp }}/fluss-logs/*
- name: Upload JaCoCo coverage report
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v6
if: ${{ success() && github.ref == 'refs/heads/main' }}
with:
name: jacoco-report-${{ matrix.module }}-${{ github.run_number}}#${{ github.run_attempt }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v6
- name: Set up JDK 8
uses: actions/setup-java@v4
uses: actions/setup-java@v5
with:
java-version: '8'
distribution: 'temurin'
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/docs-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ jobs:
run:
working-directory: ./website
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Generate versioned docs
run: ./build_versioned_docs.sh
- uses: actions/setup-node@v4
- uses: actions/setup-node@v6
with:
node-version: 20
node-version: 24
- name: Install dependencies
run: npm install
- name: Test build website
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/license-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ jobs:
MVN_VALIDATION_DIR: "/tmp/fluss-validation-deployment"

steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6

- name: Set JDK
uses: actions/setup-java@v4
uses: actions/setup-java@v5
with:
java-version: 11
distribution: 'temurin'
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ dependency-reduced-pom.xml
### VS Code ###
.vscode/

### claude code ###
.claude/

### Mac OS ###
.DS_Store

Expand Down
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Fluss (incubating)
Copyright 2025 The Apache Software Foundation
Copyright 2025-2026 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

## What is Apache Fluss (Incubating)?

Apache Fluss (Incubating) is a streaming storage built for real-time analytics which can serve as the real-time data layer for Lakehouse architectures.
Apache Fluss (Incubating) is a streaming storage built for real-time analytics & AI which can serve as the real-time data layer for Lakehouse architectures.

It bridges the gap between **data streaming** and **data Lakehouse** by enabling low-latency, high-throughput data ingestion and processing while seamlessly integrating with popular compute engines like **Apache Flink**, while
Apache Spark, and StarRocks are coming soon.
Expand All @@ -47,12 +47,12 @@ Apache Spark, and StarRocks are coming soon.

## Features

- **Sub-Second Latency**: Low-latency streaming reads/writes optimized for real-time applications with Apache Flink.
- **Columnar Stream**: 10x improvement in streaming read performance with efficient pushdown projections.
- **Streaming & Lakehouse Unification**: Unified data streaming and Lakehouse with low latencies for powerful analytics.
- **Real-Time Updates**: Cost-efficient partial updates for large-scale data without expensive join operations.
- **Changelog Generation**: Complete changelogs for streaming processors, streamlining analytics workflows.
- **Lookup Queries**: Ultra-high QPS for primary key lookups, enabling efficient dimension table serving.
- **Sub-Second Data Freshness**: Continuous ingestion and immediate availability of data enable low-latency analytics and real-time decision-making at scale.
- **Streaming & Lakehouse Unification**: Streaming-native storage with low-latency access on top of the lakehouse, using tables as a single abstraction to unify real-time and historical data across engines.
- **Columnar Streaming**: Based on Apache Arrow it allows database primitives on data streams and techniques like column pruning and predicate pushdown. This ensures engines read only the data they need, minimizing I/O and network costs.
- **Compute–Storage Separation**: Stream processors focus on pure computation while Fluss manages state and storage, with features like deduplication, partial updates, delta joins, and aggregation merge engines.
- **ML & AI–Ready Storage**: A unified storage layer supporting row-based, columnar, vector, and multi-modal data, enabling real-time feature stores and a centralized data repository for ML and AI systems.
- **Changelogs & Decision Tracking**: Built-in changelog generation provides an append-only history of state and decision evolution, enabling auditing, reproducibility, and deep system observability.

## Building

Expand Down
174 changes: 174 additions & 0 deletions fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
Expand All @@ -35,10 +39,15 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.NoRebalanceInProgressException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
import org.apache.fluss.exception.ServerTagNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
Expand All @@ -58,8 +67,12 @@
import org.apache.fluss.security.acl.AclBinding;
import org.apache.fluss.security.acl.AclBindingFilter;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -492,4 +505,165 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);

/**
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
*
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
*
* <p>If one tabletServer already has a serverTag, and the serverTag is same with the existing
* one, this operation will be ignored.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists for any one of
* the tabletServers, and the server tag is different from the existing one.
* </ul>
*
* @param tabletServers the tabletServers we want to add server tags.
* @param serverTag the server tag to be added.
*/
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Remove server tag from the specified tabletServers.
*
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
*
* <p>No exception will be thrown if the server already has no any server tag now.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagNotExistException} If the server tag does not exist for any one of the
* tabletServers.
* </ul>
*
* @param tabletServers the tabletServers we want to remove server tags.
*/
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
* bucket load.
*
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
* balancing according to the user-defined {@code priorityGoals}.
*
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
* rebalance task exists, Fluss will return the uncompleted rebalance task's progress.
*
* <p>If you want to cancel the rebalance task, you can use {@link #cancelRebalance(String)}
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an
* inProgress execution.
* </ul>
*
* @param priorityGoals the goals to be optimized.
* @return the rebalance id. If there is no rebalance task in progress, it will trigger a new
* rebalance task and return the rebalance id.
*/
CompletableFuture<String> rebalance(List<GoalType> priorityGoals);

/**
* List the rebalance progress.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress for
* the input rebalanceId.
* </ul>
*
* @param rebalanceId the rebalance id to list progress, if it is null means list the in
* progress rebalance task's.
* @return the rebalance process.
*/
CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(
@Nullable String rebalanceId);

/**
* Cannel the rebalance task.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress or
* the rebalance id is not exists.
* </ul>
*
* @param rebalanceId the rebalance id to cancel, if it is null means cancel the exists
* rebalance task. If rebalanceId is not exists in server, {@link
* NoRebalanceInProgressException} will be thrown.
*/
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);

// ==================================================================================
// Producer Offset Management APIs (for Exactly-Once Semantics)
// ==================================================================================

/**
* Register producer offset snapshot.
*
* <p>This method provides atomic "check and register" semantics:
*
* <ul>
* <li>If snapshot does not exist: create new snapshot and return {@link
* RegisterResult#CREATED}
* <li>If snapshot already exists: do NOT overwrite and return {@link
* RegisterResult#ALREADY_EXISTS}
* </ul>
*
* <p>The atomicity is guaranteed by the server implementation. This enables the caller to
* determine whether undo recovery is needed based on the return value.
*
* <p>The snapshot will be automatically cleaned up after the configured TTL expires.
*
* <p>This API is typically used by Flink Operator Coordinator at job startup to register the
* initial offset snapshot before any data is written.
*
* @param producerId the ID of the producer (typically Flink job ID)
* @param offsets map of TableBucket to offset for all tables
* @return a CompletableFuture containing the registration result indicating whether the
* snapshot was newly created or already existed
* @since 0.9
*/
CompletableFuture<RegisterResult> registerProducerOffsets(
String producerId, Map<TableBucket, Long> offsets);

/**
* Get producer offset snapshot.
*
* <p>This method retrieves the registered offset snapshot for a producer. Returns null if no
* snapshot exists for the given producer ID.
*
* <p>This API is typically used by Flink Operator Coordinator at job startup to check if a
* previous snapshot exists (indicating a failover before first checkpoint).
*
* @param producerId the ID of the producer
* @return a CompletableFuture containing the producer offsets, or null if not found
* @since 0.9
*/
CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId);

/**
* Delete producer offset snapshot.
*
* <p>This method deletes the registered offset snapshot for a producer. This is typically
* called after the first checkpoint completes successfully, as the checkpoint state will be
* used for recovery instead of the initial snapshot.
*
* @param producerId the ID of the producer
* @return a CompletableFuture that completes when deletion succeeds
* @since 0.9
*/
CompletableFuture<Void> deleteProducerOffsets(String producerId);
}
Loading
Loading