Introduce idempotency records schema and Postgres-backed IdempotencyStore#3205
Introduce idempotency records schema and Postgres-backed IdempotencyStore#3205dennishuo merged 6 commits intoapache:mainfrom
Conversation
| public Optional<IdempotencyRecord> getExisting() { return existing; } | ||
| } | ||
|
|
||
| ReserveResult reserve( |
There was a problem hiding this comment.
Please add javadoc. This is a key interface, so describing API contracts is important, especially concurrency and consistency expectations.
There was a problem hiding this comment.
Java doc added. Thanks!
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.polaris.idempotency; |
There was a problem hiding this comment.
Since the obvious intention in this interface is to provide persistent storage for idempotency records, I suppose putting the new SPI interface into org.apache.polaris.core.persistence might be preferable.
There was a problem hiding this comment.
Thanks for the suggestion! I have moved new SPI interface to org.apache.polaris.core.persistence
|
@huaxingao : Thanks for starting the implementation for idempoten retries in Polaris! Since this is a major new core feature, please mention this PR on the |
|
Thanks @dimas-b for the comments! I’ll send out a discussion email to the dev mailing list shortly. |
dennishuo
left a comment
There was a problem hiding this comment.
Looks like a good start! Could you please also re-link your proposal document and the mailing list thread in the PR description for easy navigation and historical posterity?
Two main aspects of the design I'd like to understand better which affect the design decisions in this PR:
- "Reconciliation flow" - how we distinguish from different IN_PROGRESS error states - where the request-handling crashed before doing any mutations, if it crashed in the middle of attempting mutations, if after successfully mutating but before idempotency-key finalization
- How we intend to actually use
response_summaryto ensure reconstructible responses for duplicate requests. Based on your comments about "minimal response" I'm assuming it won't be simply the full response body (good - presumably because LoadTableResponses might be very large and not fit in the database), but I guess we'll need a well-defined convention for defining how different kinds of idempotent requests map to "minimal" response reconstructors. I'm guessing we'd just store a filename to the metadata JSON file for the response toupdateTablebeing able to reconstruct TableMetadata in the response?
I guess the details of those design elements aren't anything that needs to be done on this PR specifically, but we can make sure those are clear in the design document and link to it.
| * @param before cutoff instant; records expiring before this time may be removed | ||
| * @return number of records that were purged | ||
| */ | ||
| int purgeExpired(Instant before); |
There was a problem hiding this comment.
Do we have any precedent yet for cross-realm mutations in any of the existing persistence functionality? I see how this could be appealing to purge across all realms in the typical case where the single Postgres database serves all realms, but IIRC it was still open-ended for the persistence factory to be allowed to route to different Postgres databases or even fully different instances for different realms, in which case we'd expect the async job that purges these things to be able to selectively purge for different realms independently.
There was a problem hiding this comment.
I don’t think we really have any precedent yet for cross‑realm mutations in the persistence APIs – everything else is either scoped by PolarisCallContext or takes an explicit realmId. Given that, I agree it’s safer and more flexible to keep idempotency purging realm‑scoped instead of “all realms in this physical DB.”
I’m going to change the SPI to:
int purgeExpired(String realmId, Instant before);
| ); | ||
|
|
||
| -- Helpful indexes | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); |
There was a problem hiding this comment.
Same as my comment in IdempotencyStore, keeping realm_id as a prefix to this might better reflect the control flow we'd need in the purge driver to support more sophisticated realm routing use cases (e.g. can't assume sophisticated use cases are satisfied with a global cross-realm purge call, but rather might need to be structured as having some realm iterator interface and calling purge individually on each realm).
That said, I guess the realm-splitting design is also something we need to consider for all other async maintenance types of operations, and it's possible I'm forgetting other precedent on cross-realm maintenance. If so, happy to align with that precedent.
There was a problem hiding this comment.
Good point, i will change to
CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires
ON idempotency_records (realm_id, expires_at);
| -- Finalization/replay | ||
| http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx | ||
| error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed | ||
| response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string) |
There was a problem hiding this comment.
These field descriptors should also be copied as javadoc comments into the accessors in IdempotencyRecord, since that's where the persistence-agnostic source of truth description of Polaris types/interfaces resides.
There was a problem hiding this comment.
Good point, I’ll copy those field descriptions into Javadoc on the corresponding getters in IdempotencyRecord
| * @param idempotencyKey application-provided idempotency key | ||
| * @param executorId identifier of the executor that owns the reservation | ||
| * @param now timestamp representing the current time | ||
| * @return {@code true} if the heartbeat was updated, {@code false} otherwise |
There was a problem hiding this comment.
My cursory reading of the query used in the postgres impl indicates there might be distinct causes of failing to update the heartbeat -- specifically:
- If executorId mismatches another in-progress one
- If already finalized with an HTTP status
- If the idempotencyKey doesn't already exist
We should probably include these details in the javadoc if these are indeed the intended scenarios. Is it intended for returning false to be a code error case (as in, callers should've already guaranteed none of these situations applies due to calling reserve before), or is it an expected occurrence? If it's only expected in cases of unexpected code bugs, an exception is probably better than a boolean return value.
There was a problem hiding this comment.
Good catch, you’re right: updateHeartbeat can return false in exactly the three cases you described.
The intent is for these to be normal outcomes, not “exception” cases. From the caller’s point of view, updateHeartbeat is basically “refresh the heartbeat if I still own this in‑progress reservation.” Returning true means “yes, you still own it and we updated the row,” and returning false means “there is no matching in‑progress row you own anymore, so stop heartbeating and treat this reservation as lost.”
Actual failures (SQL errors, connectivity issues, etc.) will still surface as exceptions from the implementation. I’ll update the Javadoc to spell out the three false scenarios and clarify this contract.
There was a problem hiding this comment.
It seems the three cases are different in terms of whether they should be expected under normal behavior. (1) and (3) in particular seem like code bug cases and (2) seems like the most common "normal" case.
Is the caller supposed to call load after getting a false here to figure out what happened? Maybe the return value should be an IdempotencyRecord already to allow for atomic fetch on error? Or even some kind of HeartbeatResult type of object that could hold more info about the failure.
There was a problem hiding this comment.
Thanks for the thoughtful suggestions here! I’ve updated the design to make this clearer.
Instead of returning a bare boolean, updateHeartbeat now returns a small HeartbeatResult enum:
UPDATED, FINALIZED, NOT_FOUND, and LOST_OWNERSHIP.
-
If the UPDATE matches a row, we return UPDATED.
-
If it doesn’t, we do a single load(realmId, key) to distinguish the other cases:
- no row → NOT_FOUND
- row with http_status != null → FINALIZED
- row still IN_PROGRESS but owned by a different executor → LOST_OWNERSHIP.
This way callers don’t have to guess from false or always issue a separate load.
|
|
||
| -- Helpful indexes | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_active ON idempotency_records (http_status, heartbeat_at); |
There was a problem hiding this comment.
What's the expected access pattern for this index? Do we expect this one to participate in the updateHeartbeat call? We probably want this to be realm_id prefixed if we really intend to use it outside of a single realm_id, idempotency_key lookup. Like if we're going to add some kind of listRecentIdempotencyKeys call based on heartbeat_at. But then it's not clear whether we really want http_status as the prefix first. Maybe best to add the index in the same PR that adds a use case for it, so I'd just remove this index for now unless I misunderstood the existing query patterns.
If we're just thinking of serving the additional WHERE clauses in update/finalize where we already have realm_id, idempotency_key as predicates we probably don't actually want a whole explicit index on this.
There was a problem hiding this comment.
Good question. Today the only concrete patterns we have are:
- single‑row lookups by (realm_id, idempotency_key) for load / updateHeartbeat / finalize, which are already handled by the PK, and
- the purge query, which now uses realm_id + expires_at and has its own index.
We don’t yet have an API that actually scans by heartbeat_at. I will remove this for now.
|
@dennishuo Thanks for your comment! I have updated the PR description to include the links to the proposal and the mailing list discussion thread.
At a high level, we don’t try to infer everything from the idempotency row alone. We use it together with the catalog state: heartbeats/expirations tell us when a row looks stuck, and the catalog tells us whether the underlying mutation actually happened (never ran, ambiguous, or completed). How we detect rows that need reconciliation
Case 1 – crashed before doing any durable mutations
Case 2 – crash mid‑mutation (truly ambiguous)
Case 3 – crashed after successfully mutating but before idempotency‑key finalization
So the “never started vs mid‑flight vs finished but not finalized” distinction really comes from the per‑operation reconciliation logic against the catalog; heartbeat_at / executor_id / expires_at only decide when a row is suspicious enough to hand over to that reconciler.
For response_summary, the intent is exactly what you’re describing: it’s not meant to store the full HTTP response body, but a small, operation‑specific “replay token” that lets us reconstruct an equivalent response for duplicates.
When replaying a duplicate, the reconstructor can follow that pointer back to the metastore / object storage, load the canonical metadata, and build the same logical response that the original call would have produced. I agree this needs to be spelled out per operation_type, so in the design doc I plan to add a small subsection for each idempotent operation (e.g., commit-table, drop-table, etc.) that defines the shape of its response_summary and how its reconstructor turns that back into an HTTP response. |
dennishuo
left a comment
There was a problem hiding this comment.
Added some questions and potential follow-ups, but I'm okay with merging and iterating on it from here as an initial pass
| } | ||
|
|
||
| // No rows updated: determine why by loading the current record, if any. | ||
| Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey); |
There was a problem hiding this comment.
Since the benefit of putting returning HeartbeatResult directly from the persistence impl instead of making the caller call load afterwards is the conflicting state can be read "atomically" with whatever caused the update to fail.
We probably don't have to over-optimize for those types of edge cases initially though so no action needed. Just leaving this here for posterity.
Optionally, you could add a // TODO to do some kind of atomic or transactional read with the update.
WIth postgres, it might be possible to just use RETURNING to accomplish this, though I'm not a postgres expert so it's just an idea :)
There was a problem hiding this comment.
Good idea! For now I kept the simple UPDATE + load pattern, but I added a TODO noting a potential future optimization using UPDATE ... RETURNING or a transactional read so we can make this fully atomic if needed later.
| * <p>This allows callers to distinguish between different "no update" scenarios instead of | ||
| * overloading them into a boolean. | ||
| */ | ||
| enum HeartbeatResult { |
There was a problem hiding this comment.
The most future-proof approach would be to return a struct that could include such an enum in addition to potentially an inline conflicting IdempotencyRecord, error string, etc.
But I guess we'll know better when we have the callsites so I'm okay with iterating on it once we see the entire thing working end-to-end. We should still make sure to minimize needing to change the SPI too significantly in the future though.
There was a problem hiding this comment.
Makes sense, thanks. For now I’ll keep it as just the HeartbeatResult enum so the SPI stays small, and once we have real call sites we can see if it makes sense to grow this into a richer struct (e.g. result + conflicting IdempotencyRecord).
| * An in-progress idempotency record exists for the key, but it is owned by a different | ||
| * executor. The caller should stop heartbeating as it no longer owns the reservation. | ||
| */ | ||
| LOST_OWNERSHIP |
There was a problem hiding this comment.
I may have glossed over this case -- I assumed mismatch executorId is normally an unexpected collision on an idempotencyKey. How are executors supposed to force leasing of the idempotency key? Will we actually support different executors/clients "correctly" sharing one idempotency key? And if so why does it differ from a single executor sending (maybe accidental) async retries?
There was a problem hiding this comment.
Good question. I agree that a mismatched executorId shouldn’t be part of the normal path.
In the intended model, a single executor reserves the key and keeps using the same executorId for its retries, so other executors should see DUPLICATE at reserve time and never call updateHeartbeat for that key. In that sense, LOST_OWNERSHIP is mostly a defensive / diagnostic state.
|
@dennishuo Thanks a lot for reviewing and merging the PR! Really appreciate your help! I think i also need to change the schema for h2. I will submit a follow-up PR |
singhpk234
left a comment
There was a problem hiding this comment.
Thank you for the change @huaxingao !
Have a following feedbacks to the change, it would be really nice if we can address / fix forward.
- we didn't define models for the table being introduced like we do existing tables is it intentional ?
- implementation glued to PG only and relational DB agnostic, but we have most of the code not opinionated on db specific stuff, this become problematic when extending #3352
- we should not have v3 schema modified, i believe once we freeze v3 we should only add things which doesn't fail bootstrap
- we would want to use seperate datasource pools from the one used for managing entities ? also does the idempotency table need to part of same DB. I am a bit concerned of the perf implication this brings in general, i would recommend if we can do some bechmark runs agains polaris to see how the perf is now
|
|
||
| -- Idempotency records (key-only idempotency; durable replay) | ||
| CREATE TABLE IF NOT EXISTS idempotency_records ( | ||
| realm_id TEXT NOT NULL, | ||
| idempotency_key TEXT NOT NULL, | ||
| operation_type TEXT NOT NULL, | ||
| resource_id TEXT NOT NULL, | ||
|
|
||
| -- Finalization/replay | ||
| http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx | ||
| error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed | ||
| response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string) | ||
| response_headers TEXT, -- small whitelisted headers to replay (JSON string) | ||
| finalized_at TIMESTAMP, -- when http_status was written | ||
|
|
||
| -- Liveness/ops | ||
| created_at TIMESTAMP NOT NULL, | ||
| updated_at TIMESTAMP NOT NULL, | ||
| heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS | ||
| executor_id TEXT, -- owner pod/worker id | ||
| expires_at TIMESTAMP, | ||
|
|
||
| PRIMARY KEY (realm_id, idempotency_key) | ||
| ); | ||
|
|
||
| -- Helpful indexes | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); |
There was a problem hiding this comment.
V3 is already release, we would need v4 now
|
|
||
| -- Helpful indexes | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_active ON idempotency_records (http_status, heartbeat_at); |
There was a problem hiding this comment.
don;t we need realm_id in these index ?
| String realmId = rs.getString("realm_id"); | ||
| String idempotencyKey = rs.getString("idempotency_key"); | ||
| String operationType = rs.getString("operation_type"); | ||
| String resourceId = rs.getString("resource_id"); | ||
| Integer httpStatus = (Integer) rs.getObject("http_status"); | ||
| String errorSubtype = rs.getString("error_subtype"); | ||
| String responseSummary = rs.getString("response_summary"); | ||
| String responseHeaders = rs.getString("response_headers"); | ||
| Instant createdAt = rs.getTimestamp("created_at").toInstant(); | ||
| Instant updatedAt = rs.getTimestamp("updated_at").toInstant(); | ||
| Timestamp fts = rs.getTimestamp("finalized_at"); | ||
| Instant finalizedAt = fts == null ? null : fts.toInstant(); | ||
| Timestamp hb = rs.getTimestamp("heartbeat_at"); | ||
| Instant heartbeatAt = hb == null ? null : hb.toInstant(); | ||
| String executorId = rs.getString("executor_id"); | ||
| Instant expiresAt = rs.getTimestamp("expires_at").toInstant(); |
There was a problem hiding this comment.
can we define model for this ? we have convertors functions for this
| String sql = | ||
| "UPDATE " | ||
| + TABLE | ||
| + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," | ||
| + " finalized_at = ?, updated_at = ?" | ||
| + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; |
There was a problem hiding this comment.
i wonder if we can leverage query generator for this ?
| public final class PostgresIdempotencyStore implements IdempotencyStore { | ||
| private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); | ||
|
|
||
| private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; |
There was a problem hiding this comment.
can we use QueryGenerator method to infer table name from model instead of hardcoding it ?
| } | ||
| } | ||
|
|
||
| private static IdempotencyRecord convert(ResultSet rs) { |
There was a problem hiding this comment.
can IdempotencyRecord not be a model ? like we have existing model and we just use Convertor abstraction
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** Postgres implementation of IdempotencyStore. */ | ||
| public final class PostgresIdempotencyStore implements IdempotencyStore { |
There was a problem hiding this comment.
why does this has to be Postgres specific ?
| ); | ||
|
|
||
| -- Helpful indexes | ||
| CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires |
There was a problem hiding this comment.
This will cause already bootstrapped version with V3 to fail, we should keep this in v4
| public PostgresIdempotencyStore( | ||
| @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) | ||
| throws SQLException { | ||
| this.ops = new DatasourceOperations(dataSource, cfg); |
There was a problem hiding this comment.
I am not sure if reusing the same datasource where entities are stored is a good idea, can we move idempotency to a different datasource, do we have estimates on how the writes performance regress with this additional db call ?
| import org.testcontainers.junit.jupiter.Testcontainers; | ||
|
|
||
| @Testcontainers | ||
| public class PostgresIdempotencyStoreIT { |
There was a problem hiding this comment.
THis should be database agnostic and the profile can be database specific.
|
@singhpk234 Thanks for your comments! I have actually just created a PR to change to V4 schema and also add H2 schema-v4. Will have one more followup PR to address your other questions. |
…tore (apache#3205) This PR adds the persistence foundation for REST idempotency in Polaris: Defines an idempotency_records table (Postgres) with key, binding, liveness, and finalization fields. Introduces a storage‑agnostic IdempotencyRecord model and IdempotencyStore SPI in polaris-core. Implements PostgresIdempotencyStore in polaris-relational-jdbc
* NoSQL: reduce heap pressure when running tests (apache#3267) Some tests generate a lot of realms, likely one realm per test case. While the amount of data per realm is not much, it is nontheless nice to remove that data immediately (for tests). The maintenance service, which purges data of eligible realms, cannot be run against the in-memory backend (different JVM). This change adds a rather "test only" workaround to purge the realm data in the in-memory backend immediately. * fix(deps): update dependency org.projectnessie.cel:cel-bom to v0.6.0 (apache#3356) * chore: Suppress unchecked case from mock (apache#3322) This is to fix javac `warning: [unchecked] unchecked conversion TriConsumer<String, TableIdentifier, MetricsReport> mockConsumer = mock(TriConsumer.class);` * Add Polaris Console on the tools set (apache#3355) * Move remaining build-time server properties to runtime/defaults (apache#3341) The `runtime/defaults` module is meant as a holder of default Polaris Server runtime properties. This change moves a few remaining properties from the `application.properties` file under `runtime/server` into `runtime/defaults` to avoid any possible confusion regarding the location of effective server properties. Note: The Admin tool has its own `application.properties` file. * Deprecate untyped `RealmConfig.getConfig()` (apache#3323) * `getConfig(String)` has a generic return type, but the call path that gets the value does not perform any type validation. * Deprecate this method in favour of well-typed `getConfig(PolarisConfiguration)` * Migrate the single use case in Polaris code to the well-typed method. * fix(deps): update dependency io.smallrye.common:smallrye-common-annotation to v2.15.0 (apache#3365) * Fix error handler parameters in TaskExecutorImpl (apache#3358) Use correct exception variable inside `CompletableFuture.exceptionallyComposeAsync()` * fix(deps): update dependency org.junit:junit-bom to v5.14.2 (apache#3363) * fix(deps): update dependency com.gradleup.shadow:shadow-gradle-plugin to v9.3.1 (apache#3361) * Propagate previous task exceptions as "suppressed" (apache#3367) * Propagate previous task exceptions as "suppressed" Task retries may fail in different ways in each attempt, however only the last exception used to be exposed to the caller. This change propagates exceptions from all previous tasks execution attempts as "suppressed" exceptions chained to the final tasks failure exception. * Introduce idempotency records schema and Postgres-backed IdempotencyStore (apache#3205) This PR adds the persistence foundation for REST idempotency in Polaris: Defines an idempotency_records table (Postgres) with key, binding, liveness, and finalization fields. Introduces a storage‑agnostic IdempotencyRecord model and IdempotencyStore SPI in polaris-core. Implements PostgresIdempotencyStore in polaris-relational-jdbc * fix(deps): update dependency ch.qos.logback:logback-classic to v1.5.24 (apache#3369) * fix(deps): update immutables to v2.12.1 (apache#3368) * Remove unnecessary version spec in jdbc persistence build file (apache#3373) The testcontainers BOM is already included, so the version spec, which doesn't match the BOM version, is unnecessary. * Fix typo (apache#3376) * Cosmetic: sort lines in libs.versions.toml (apache#3133) * fix(deps): update dependency com.github.dasniko:testcontainers-keycloak to v4.1.0 (apache#3375) * Remove Admin tests from required_status_checks (apache#3370) * fix(deps): update quarkus platform and group to v3.30.6 (apache#3374) * [Bug] Fix a bug that causing error when setting `write.data.path` to be a subdirectory of the table location (apache#3371) Currently, when updating write.data.path of the table to a subdir under the table location, it will fail the location overlap check. For example spark-sql> ALTER TABLE tb1 SET TBLPROPERTIES ( 'write.data.path' = '<tableLocation>/alternative_data' ); org.apache.iceberg.exceptions.ForbiddenException: Forbidden: Unable to create table at location 's3://<table_location>' because it conflicts with existing table or namespace at location 's3://<table_location>` IcebergCatalog.validateNoLocationOverlap(...) constructs a virtual PolarisEntity for overlap checking, but it did not set the entity name. When fetching the siblings of the table, it fails to filter out itself and thus the check mistaken considered that the write.data.path conflict with the table's own base location. (isChildOf) This PR fix the issue by adding name to the virtual PolarisEntity and add a unit and a integration test. * Update release guide to reference the proposed vote e-mail (apache#3377) Now that apache#3150 has been merged, a VOTE e-mail is pre-generated by the third workflow so that there is less room for error. The release guide has been updated to reflect this. There is no pre-generated Incubator vote e-mail as part of the third workflow. This is acceptable in that eventually, after graduation, that would become unnecessary. But also that it could cause confusion as there would be two proposed e-mail bodies in the third workflow. Recommendation is to let the release manager created the Incubator vote e-mail manually. * Last merged commit b324fc3 --------- Co-authored-by: Mend Renovate <bot@renovateapp.com> Co-authored-by: Dmitri Bourlatchkov <dmitri.bourlatchkov@gmail.com> Co-authored-by: JB Onofré <jbonofre@apache.org> Co-authored-by: Huaxin Gao <huaxin.gao11@gmail.com> Co-authored-by: Yong Zheng <yongzheng0809@gmail.com> Co-authored-by: Yufei Gu <yufei@apache.org> Co-authored-by: Honah (Jonas) J. <honahx@apache.org> Co-authored-by: Pierre Laporte <pierre@pingtimeout.fr>
This PR adds the persistence foundation for REST idempotency in Polaris:
Defines an
idempotency_recordstable (Postgres) with key, binding, liveness, and finalization fields.Introduces a storage‑agnostic
IdempotencyRecordmodel andIdempotencyStoreSPI in polaris-core.Implements
PostgresIdempotencyStoreinpolaris-relational-jdbcproposal
discussion thread
Checklist
CHANGELOG.md(if needed)site/content/in-dev/unreleased(if needed)