From 8a2ba83fcd121f596cf6a60dc4c89cbcb4caa872 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 27 Jan 2026 15:36:44 -0800 Subject: [PATCH 1/8] relational-jdbc: add DB-agnostic idempotency store + model --- .../relational/jdbc/DatasourceOperations.java | 7 + .../relational/jdbc/QueryGenerator.java | 9 + .../idempotency/PostgresIdempotencyStore.java | 255 ------------------ .../RelationalJdbcIdempotencyStore.java | 210 +++++++++++++++ .../jdbc/models/ModelIdempotencyRecord.java | 211 +++++++++++++++ ...tionalJdbcIdempotencyStorePostgresIT.java} | 8 +- .../entity}/IdempotencyRecord.java | 25 +- .../core/persistence/IdempotencyStore.java | 2 +- 8 files changed, 462 insertions(+), 265 deletions(-) delete mode 100644 persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java create mode 100644 persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java create mode 100644 persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java rename persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/{PostgresIdempotencyStoreIT.java => RelationalJdbcIdempotencyStorePostgresIT.java} (96%) rename polaris-core/src/main/java/org/apache/polaris/{idempotency => core/entity}/IdempotencyRecord.java (85%) diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java index e44de3a94c..3fa17b2434 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java @@ -83,6 +83,13 @@ DatabaseType getDatabaseType() { return databaseType; } + /** + * Returns the detected database type for this datasource. + */ + public DatabaseType databaseType() { + return databaseType; + } + /** * Execute SQL script and close the associated input stream * diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index 485956ed85..f690b5ef79 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -50,6 +50,15 @@ public record PreparedBatchQuery(String sql, List> parametersList) /** A container for the query fragment SQL string and the ordered parameter values. */ record QueryFragment(String sql, List parameters) {} + /** + * Returns the fully-qualified table name used by relational-jdbc queries. + * + * @param tableName Target table name. + */ + public static String fullyQualifiedTableName(@Nonnull String tableName) { + return getFullyQualifiedTableName(tableName); + } + /** * Generates a SELECT query with projection and filtering. * diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java deleted file mode 100644 index ff826e19e8..0000000000 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.polaris.persistence.relational.jdbc.idempotency; - -import jakarta.annotation.Nonnull; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import javax.sql.DataSource; -import org.apache.polaris.core.persistence.IdempotencyStore; -import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; -import org.apache.polaris.idempotency.IdempotencyRecord; -import org.apache.polaris.persistence.relational.jdbc.DatabaseType; -import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; -import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; -import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; -import org.apache.polaris.persistence.relational.jdbc.models.Converter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Postgres implementation of IdempotencyStore. */ -public final class PostgresIdempotencyStore implements IdempotencyStore { - private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); - - private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; - - private final DatasourceOperations ops; - - public PostgresIdempotencyStore( - @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) - throws SQLException { - this.ops = new DatasourceOperations(dataSource, cfg); - } - - @Override - public ReserveResult reserve( - String realmId, - String idempotencyKey, - String operationType, - String normalizedResourceId, - Instant expiresAt, - String executorId, - Instant now) { - String sql = - "INSERT INTO " - + TABLE - + " (realm_id, idempotency_key, operation_type, resource_id," - + " http_status, error_subtype, response_summary, response_headers, finalized_at," - + " created_at, updated_at, heartbeat_at, executor_id, expires_at)" - + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, ?)" - + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING"; - List params = - List.of( - realmId, - idempotencyKey, - operationType, - normalizedResourceId, - Timestamp.from(now), - Timestamp.from(now), - Timestamp.from(now), - executorId, - Timestamp.from(expiresAt)); - try { - int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, params)); - if (updated == 1) { - return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); - } else { - // Load existing to return to caller - return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); - } - } catch (SQLException e) { - throw new RuntimeException("Failed to reserve idempotency key", e); - } - } - - @Override - public Optional load(String realmId, String idempotencyKey) { - String sql = - "SELECT realm_id, idempotency_key, operation_type, resource_id, http_status, error_subtype," - + " response_summary, response_headers, created_at, updated_at, finalized_at, heartbeat_at," - + " executor_id, expires_at" - + " FROM " - + TABLE - + " WHERE realm_id = ? AND idempotency_key = ?"; - try { - final IdempotencyRecord[] holder = new IdempotencyRecord[1]; - ops.executeSelectOverStream( - new QueryGenerator.PreparedQuery(sql, List.of(realmId, idempotencyKey)), - new Converter() { - @Override - public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { - return convert(rs); - } - - @Override - public Map toMap(DatabaseType databaseType) { - return Map.of(); - } - }, - stream -> stream.findFirst().ifPresent(r -> holder[0] = r)); - return Optional.ofNullable(holder[0]); - } catch (SQLException e) { - throw new RuntimeException("Failed to load idempotency record", e); - } - } - - @Override - public HeartbeatResult updateHeartbeat( - String realmId, String idempotencyKey, String executorId, Instant now) { - String sql = - "UPDATE " - + TABLE - + " SET heartbeat_at = ?, updated_at = ?" - + " WHERE realm_id = ? AND idempotency_key = ?" - + " AND http_status IS NULL" - + " AND (executor_id IS NULL OR executor_id = ?)"; - try { - int rows = - ops.executeUpdate( - new QueryGenerator.PreparedQuery( - sql, - List.of( - Timestamp.from(now), - Timestamp.from(now), - realmId, - idempotencyKey, - executorId))); - if (rows > 0) { - return HeartbeatResult.UPDATED; - } - - // No rows updated: determine why by loading the current record, if any. - // TODO: consider using a single atomic read/write (for example, PostgreSQL - // UPDATE ... RETURNING) to avoid this follow-up lookup and make the - // conflicting state observable in the same operation. - Optional existing = load(realmId, idempotencyKey); - if (existing.isEmpty()) { - return HeartbeatResult.NOT_FOUND; - } - - IdempotencyRecord record = existing.get(); - if (record.getHttpStatus() != null) { - return HeartbeatResult.FINALIZED; - } - - // Record is still IN_PROGRESS but owned by a different executor. - return HeartbeatResult.LOST_OWNERSHIP; - } catch (SQLException e) { - throw new RuntimeException("Failed to update heartbeat", e); - } - } - - @Override - public boolean finalizeRecord( - String realmId, - String idempotencyKey, - Integer httpStatus, - String errorSubtype, - String responseSummary, - String responseHeaders, - Instant finalizedAt) { - String sql = - "UPDATE " - + TABLE - + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," - + " finalized_at = ?, updated_at = ?" - + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; - try { - int rows = - ops.executeUpdate( - new QueryGenerator.PreparedQuery( - sql, - Arrays.asList( - httpStatus, - errorSubtype, - responseSummary, - responseHeaders, - Timestamp.from(finalizedAt), - Timestamp.from(finalizedAt), - realmId, - idempotencyKey))); - return rows > 0; - } catch (SQLException e) { - throw new RuntimeException("Failed to finalize idempotency record", e); - } - } - - @Override - public int purgeExpired(String realmId, Instant before) { - String sql = "DELETE FROM " + TABLE + " WHERE realm_id = ? AND expires_at < ?"; - try { - return ops.executeUpdate( - new QueryGenerator.PreparedQuery(sql, List.of(realmId, Timestamp.from(before)))); - } catch (SQLException e) { - throw new RuntimeException("Failed to purge expired idempotency records", e); - } - } - - private static IdempotencyRecord convert(ResultSet rs) { - try { - String realmId = rs.getString("realm_id"); - String idempotencyKey = rs.getString("idempotency_key"); - String operationType = rs.getString("operation_type"); - String resourceId = rs.getString("resource_id"); - Integer httpStatus = (Integer) rs.getObject("http_status"); - String errorSubtype = rs.getString("error_subtype"); - String responseSummary = rs.getString("response_summary"); - String responseHeaders = rs.getString("response_headers"); - Instant createdAt = rs.getTimestamp("created_at").toInstant(); - Instant updatedAt = rs.getTimestamp("updated_at").toInstant(); - Timestamp fts = rs.getTimestamp("finalized_at"); - Instant finalizedAt = fts == null ? null : fts.toInstant(); - Timestamp hb = rs.getTimestamp("heartbeat_at"); - Instant heartbeatAt = hb == null ? null : hb.toInstant(); - String executorId = rs.getString("executor_id"); - Instant expiresAt = rs.getTimestamp("expires_at").toInstant(); - return new IdempotencyRecord( - realmId, - idempotencyKey, - operationType, - resourceId, - httpStatus, - errorSubtype, - responseSummary, - responseHeaders, - createdAt, - updatedAt, - finalizedAt, - heartbeatAt, - executorId, - expiresAt); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } -} diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java new file mode 100644 index 0000000000..03f36a194a --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Arrays; +import javax.sql.DataSource; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.ImmutableModelIdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord; + +/** {@link IdempotencyStore} backed by the relational-jdbc schema. */ +public class RelationalJdbcIdempotencyStore implements IdempotencyStore { + + private final DatasourceOperations datasourceOperations; + + public RelationalJdbcIdempotencyStore( + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) throws SQLException { + this.datasourceOperations = new DatasourceOperations(dataSource, cfg); + } + + @Override + public ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now) { + try { + ModelIdempotencyRecord model = + ImmutableModelIdempotencyRecord.builder() + .realmId(realmId) + .idempotencyKey(idempotencyKey) + .operationType(operationType) + .resourceId(normalizedResourceId) + .createdAt(now) + .updatedAt(now) + .heartbeatAt(now) + .executorId(executorId) + .expiresAt(expiresAt) + .build(); + + List values = model.toMap(datasourceOperations.databaseType()).values().stream().toList(); + QueryGenerator.PreparedQuery insert = + QueryGenerator.generateInsertQuery( + ModelIdempotencyRecord.ALL_COLUMNS, ModelIdempotencyRecord.TABLE_NAME, values, realmId); + datasourceOperations.executeUpdate(insert); + return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); + } catch (SQLException e) { + if (datasourceOperations.isConstraintViolation(e)) { + return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); + } + throw new RuntimeException("Failed to reserve idempotency key", e); + } + } + + @Override + public Optional load(String realmId, String idempotencyKey) { + try { + QueryGenerator.PreparedQuery query = + QueryGenerator.generateSelectQuery( + ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.TABLE_NAME, + Map.of( + ModelIdempotencyRecord.REALM_ID, + realmId, + ModelIdempotencyRecord.IDEMPOTENCY_KEY, + idempotencyKey)); + List results = + datasourceOperations.executeSelect(query, ModelIdempotencyRecord.CONVERTER); + if (results.isEmpty()) { + return Optional.empty(); + } + if (results.size() > 1) { + throw new IllegalStateException( + "More than one idempotency record found for realm/key: " + realmId + "/" + idempotencyKey); + } + return Optional.of(results.getFirst()); + } catch (SQLException e) { + throw new RuntimeException("Failed to load idempotency record", e); + } + } + + @Override + public HeartbeatResult updateHeartbeat( + String realmId, String idempotencyKey, String executorId, Instant now) { + Optional existing = load(realmId, idempotencyKey); + if (existing.isEmpty()) { + return HeartbeatResult.NOT_FOUND; + } + + IdempotencyRecord record = existing.get(); + if (record.getHttpStatus() != null) { + return HeartbeatResult.FINALIZED; + } + if (record.getExecutorId() == null || !record.getExecutorId().equals(executorId)) { + return HeartbeatResult.LOST_OWNERSHIP; + } + + String sql = + "UPDATE " + + QueryGenerator.fullyQualifiedTableName(ModelIdempotencyRecord.TABLE_NAME) + + " SET heartbeat_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL AND executor_id = ?"; + QueryGenerator.PreparedQuery update = + new QueryGenerator.PreparedQuery( + sql, + List.of( + Timestamp.from(now), + Timestamp.from(now), + realmId, + idempotencyKey, + executorId)); + + try { + int updated = datasourceOperations.executeUpdate(update); + if (updated > 0) { + return HeartbeatResult.UPDATED; + } + } catch (SQLException e) { + throw new RuntimeException("Failed to update idempotency heartbeat", e); + } + + // Raced with finalize/ownership loss; re-check to return a meaningful result. + Optional after = load(realmId, idempotencyKey); + if (after.isEmpty()) { + return HeartbeatResult.NOT_FOUND; + } + if (after.get().getHttpStatus() != null) { + return HeartbeatResult.FINALIZED; + } + return HeartbeatResult.LOST_OWNERSHIP; + } + + @Override + public boolean finalizeRecord( + String realmId, + String idempotencyKey, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant finalizedAt) { + String sql = + "UPDATE " + + QueryGenerator.fullyQualifiedTableName(ModelIdempotencyRecord.TABLE_NAME) + + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," + + " finalized_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; + QueryGenerator.PreparedQuery update = + new QueryGenerator.PreparedQuery( + sql, + Arrays.asList( + httpStatus, + errorSubtype, + responseSummary, + responseHeaders, + Timestamp.from(finalizedAt), + Timestamp.from(finalizedAt), + realmId, + idempotencyKey)); + + try { + return datasourceOperations.executeUpdate(update) > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to finalize idempotency record", e); + } + } + + @Override + public int purgeExpired(String realmId, Instant before) { + try { + String sql = + "DELETE FROM " + + QueryGenerator.fullyQualifiedTableName(ModelIdempotencyRecord.TABLE_NAME) + + " WHERE realm_id = ? AND expires_at IS NOT NULL AND expires_at < ?"; + QueryGenerator.PreparedQuery delete = + new QueryGenerator.PreparedQuery(sql, List.of(realmId, Timestamp.from(before))); + return datasourceOperations.executeUpdate(delete); + } catch (SQLException e) { + throw new RuntimeException("Failed to purge expired idempotency records", e); + } + } +} diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java new file mode 100644 index 0000000000..a6a9e90d5f --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.relational.jdbc.models; + +import jakarta.annotation.Nullable; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; + +/** + * JDBC model for {@link IdempotencyRecord} mirroring the {@code idempotency_records} table. + * + *

This follows the same pattern as {@link ModelEvent}, separating the storage representation + * from the core domain model while still providing {@link Converter} helpers. + */ +@PolarisImmutable +public interface ModelIdempotencyRecord extends Converter { + + String TABLE_NAME = "idempotency_records"; + + String REALM_ID = "realm_id"; + String IDEMPOTENCY_KEY = "idempotency_key"; + String OPERATION_TYPE = "operation_type"; + String RESOURCE_ID = "resource_id"; + + String HTTP_STATUS = "http_status"; + String ERROR_SUBTYPE = "error_subtype"; + String RESPONSE_SUMMARY = "response_summary"; + String RESPONSE_HEADERS = "response_headers"; + String FINALIZED_AT = "finalized_at"; + + String CREATED_AT = "created_at"; + String UPDATED_AT = "updated_at"; + String HEARTBEAT_AT = "heartbeat_at"; + String EXECUTOR_ID = "executor_id"; + String EXPIRES_AT = "expires_at"; + + List ALL_COLUMNS = + List.of( + IDEMPOTENCY_KEY, + OPERATION_TYPE, + RESOURCE_ID, + HTTP_STATUS, + ERROR_SUBTYPE, + RESPONSE_SUMMARY, + RESPONSE_HEADERS, + FINALIZED_AT, + CREATED_AT, + UPDATED_AT, + HEARTBEAT_AT, + EXECUTOR_ID, + EXPIRES_AT); + + /** + * Columns to select when reading idempotency records. + * + *

{@code realm_id} is intentionally not part of {@link #ALL_COLUMNS} because {@link + * org.apache.polaris.persistence.relational.jdbc.QueryGenerator#generateInsertQuery(List, String, + * List, String)} appends it automatically for all relational-jdbc tables. + */ + List SELECT_COLUMNS = + List.of( + REALM_ID, + IDEMPOTENCY_KEY, + OPERATION_TYPE, + RESOURCE_ID, + HTTP_STATUS, + ERROR_SUBTYPE, + RESPONSE_SUMMARY, + RESPONSE_HEADERS, + FINALIZED_AT, + CREATED_AT, + UPDATED_AT, + HEARTBEAT_AT, + EXECUTOR_ID, + EXPIRES_AT); + + /** + * Dummy instance to be used as a Converter when calling {@link #fromResultSet(ResultSet)}. + * + *

FIXME: fromResultSet() is a factory method and should be static or moved to a factory class. + */ + ModelIdempotencyRecord CONVERTER = + ImmutableModelIdempotencyRecord.builder() + .realmId("") + .idempotencyKey("") + .operationType("") + .resourceId("") + .createdAt(Instant.EPOCH) + .updatedAt(Instant.EPOCH) + .expiresAt(Instant.EPOCH) + .build(); + + String getRealmId(); + + String getIdempotencyKey(); + + String getOperationType(); + + String getResourceId(); + + @Nullable + Integer getHttpStatus(); + + @Nullable + String getErrorSubtype(); + + @Nullable + String getResponseSummary(); + + @Nullable + String getResponseHeaders(); + + @Nullable + Instant getFinalizedAt(); + + Instant getCreatedAt(); + + Instant getUpdatedAt(); + + @Nullable + Instant getHeartbeatAt(); + + @Nullable + String getExecutorId(); + + Instant getExpiresAt(); + + @Override + default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + String realmId = rs.getString(REALM_ID); + String idempotencyKey = rs.getString(IDEMPOTENCY_KEY); + String operationType = rs.getString(OPERATION_TYPE); + String resourceId = rs.getString(RESOURCE_ID); + + Integer httpStatus = (Integer) rs.getObject(HTTP_STATUS); + String errorSubtype = rs.getString(ERROR_SUBTYPE); + String responseSummary = rs.getString(RESPONSE_SUMMARY); + String responseHeaders = rs.getString(RESPONSE_HEADERS); + + Instant createdAt = rs.getTimestamp(CREATED_AT).toInstant(); + Instant updatedAt = rs.getTimestamp(UPDATED_AT).toInstant(); + + Timestamp finalizedTs = rs.getTimestamp(FINALIZED_AT); + Instant finalizedAt = finalizedTs == null ? null : finalizedTs.toInstant(); + + Timestamp heartbeatTs = rs.getTimestamp(HEARTBEAT_AT); + Instant heartbeatAt = heartbeatTs == null ? null : heartbeatTs.toInstant(); + + String executorId = rs.getString(EXECUTOR_ID); + Instant expiresAt = rs.getTimestamp(EXPIRES_AT).toInstant(); + + return new IdempotencyRecord( + realmId, + idempotencyKey, + operationType, + resourceId, + httpStatus, + errorSubtype, + responseSummary, + responseHeaders, + createdAt, + updatedAt, + finalizedAt, + heartbeatAt, + executorId, + expiresAt); + } + + @Override + default Map toMap(DatabaseType databaseType) { + Map map = new LinkedHashMap<>(); + map.put(IDEMPOTENCY_KEY, getIdempotencyKey()); + map.put(OPERATION_TYPE, getOperationType()); + map.put(RESOURCE_ID, getResourceId()); + map.put(HTTP_STATUS, getHttpStatus()); + map.put(ERROR_SUBTYPE, getErrorSubtype()); + map.put(RESPONSE_SUMMARY, getResponseSummary()); + map.put(RESPONSE_HEADERS, getResponseHeaders()); + map.put(FINALIZED_AT, getFinalizedAt() == null ? null : Timestamp.from(getFinalizedAt())); + map.put(CREATED_AT, Timestamp.from(getCreatedAt())); + map.put(UPDATED_AT, Timestamp.from(getUpdatedAt())); + map.put(HEARTBEAT_AT, getHeartbeatAt() == null ? null : Timestamp.from(getHeartbeatAt())); + map.put(EXECUTOR_ID, getExecutorId()); + map.put(EXPIRES_AT, Timestamp.from(getExpiresAt())); + return map; + } +} diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java similarity index 96% rename from persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java rename to persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index 3d92ceba73..7af6659d47 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -23,9 +23,9 @@ import java.time.Instant; import java.util.Optional; import javax.sql.DataSource; +import org.apache.polaris.core.entity.IdempotencyRecord; import org.apache.polaris.core.persistence.IdempotencyStore; import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; -import org.apache.polaris.idempotency.IdempotencyRecord; import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; import org.junit.jupiter.api.AfterAll; @@ -37,14 +37,14 @@ import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers -public class PostgresIdempotencyStoreIT { +public class RelationalJdbcIdempotencyStorePostgresIT { @Container private static final PostgreSQLContainer POSTGRES = new PostgreSQLContainer<>("postgres:17.5-alpine"); private static DataSource dataSource; - private static PostgresIdempotencyStore store; + private static RelationalJdbcIdempotencyStore store; @BeforeAll static void setup() throws Exception { @@ -84,7 +84,7 @@ public Optional initialDelayInMs() { ops.executeScript(is); } - store = new PostgresIdempotencyStore(dataSource, cfg); + store = new RelationalJdbcIdempotencyStore(dataSource, cfg); } @AfterAll diff --git a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java similarity index 85% rename from polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java rename to polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java index e4cae7ef84..cc012cb8dd 100644 --- a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java @@ -14,11 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.polaris.idempotency; +package org.apache.polaris.core.entity; import java.time.Instant; +/** + * Immutable snapshot of an idempotency reservation and its finalization status. + * + *

This is the persistence-agnostic representation used by higher layers; storage backends map + * their concrete schemas into this type. + */ public final class IdempotencyRecord { + private final String realmId; private final String idempotencyKey; private final String operationType; @@ -79,6 +86,14 @@ public String getOperationType() { return operationType; } + /** + * Normalized identifier of the resource affected by the operation. + * + *

This should be derived from the request (for example, a canonicalized path like + * {@code "tables/ns.tbl"}), not from a generated internal entity id. This ensures the binding is + * available even when an operation fails before creating any entities, and allows the HTTP layer + * to detect idempotency-key reuse across different resources. + */ public String getNormalizedResourceId() { return normalizedResourceId; } @@ -96,8 +111,8 @@ public Integer getHttpStatus() { /** * Optional error subtype or code that provides additional detail when the operation failed. * - *

Examples include {@code already_exists}, {@code namespace_not_empty}, or {@code - * idempotency_replay_failed}. + *

Examples include {@code already_exists}, {@code namespace_not_empty}, or + * {@code idempotency_replay_failed}. */ public String getErrorSubtype() { return errorSubtype; @@ -117,8 +132,8 @@ public String getResponseSummary() { /** * Serialized representation of a small, whitelisted set of HTTP response headers. * - *

Stored as a JSON string so that the HTTP layer can replay key headers (such as {@code - * Content-Type}) when serving a duplicate idempotent request. + *

Stored as a JSON string so that the HTTP layer can replay key headers (such as + * {@code Content-Type}) when serving a duplicate idempotent request. */ public String getResponseHeaders() { return responseHeaders; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java index cf3e9d98d5..f68d7097fe 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java @@ -18,7 +18,7 @@ import java.time.Instant; import java.util.Optional; -import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.core.entity.IdempotencyRecord; /** * Abstraction for persisting and querying idempotency records. From 236cb0f69587c65d5626b2fccbbb98fc0f8136ac Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 27 Jan 2026 19:33:21 -0800 Subject: [PATCH 2/8] spotless apply --- .../relational/jdbc/DatasourceOperations.java | 4 +-- .../RelationalJdbcIdempotencyStore.java | 25 +++++++++++-------- .../core/entity/IdempotencyRecord.java | 12 ++++----- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java index 3fa17b2434..6fed5b7e45 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java @@ -83,9 +83,7 @@ DatabaseType getDatabaseType() { return databaseType; } - /** - * Returns the detected database type for this datasource. - */ + /** Returns the detected database type for this datasource. */ public DatabaseType databaseType() { return databaseType; } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java index 03f36a194a..1d86f5641f 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java @@ -20,10 +20,10 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Arrays; import javax.sql.DataSource; import org.apache.polaris.core.entity.IdempotencyRecord; import org.apache.polaris.core.persistence.IdempotencyStore; @@ -39,7 +39,8 @@ public class RelationalJdbcIdempotencyStore implements IdempotencyStore { private final DatasourceOperations datasourceOperations; public RelationalJdbcIdempotencyStore( - @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) throws SQLException { + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) + throws SQLException { this.datasourceOperations = new DatasourceOperations(dataSource, cfg); } @@ -66,10 +67,14 @@ public ReserveResult reserve( .expiresAt(expiresAt) .build(); - List values = model.toMap(datasourceOperations.databaseType()).values().stream().toList(); + List values = + model.toMap(datasourceOperations.databaseType()).values().stream().toList(); QueryGenerator.PreparedQuery insert = QueryGenerator.generateInsertQuery( - ModelIdempotencyRecord.ALL_COLUMNS, ModelIdempotencyRecord.TABLE_NAME, values, realmId); + ModelIdempotencyRecord.ALL_COLUMNS, + ModelIdempotencyRecord.TABLE_NAME, + values, + realmId); datasourceOperations.executeUpdate(insert); return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); } catch (SQLException e) { @@ -99,7 +104,10 @@ public Optional load(String realmId, String idempotencyKey) { } if (results.size() > 1) { throw new IllegalStateException( - "More than one idempotency record found for realm/key: " + realmId + "/" + idempotencyKey); + "More than one idempotency record found for realm/key: " + + realmId + + "/" + + idempotencyKey); } return Optional.of(results.getFirst()); } catch (SQLException e) { @@ -131,12 +139,7 @@ public HeartbeatResult updateHeartbeat( QueryGenerator.PreparedQuery update = new QueryGenerator.PreparedQuery( sql, - List.of( - Timestamp.from(now), - Timestamp.from(now), - realmId, - idempotencyKey, - executorId)); + List.of(Timestamp.from(now), Timestamp.from(now), realmId, idempotencyKey, executorId)); try { int updated = datasourceOperations.executeUpdate(update); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java index cc012cb8dd..9583a15e8e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java @@ -89,8 +89,8 @@ public String getOperationType() { /** * Normalized identifier of the resource affected by the operation. * - *

This should be derived from the request (for example, a canonicalized path like - * {@code "tables/ns.tbl"}), not from a generated internal entity id. This ensures the binding is + *

This should be derived from the request (for example, a canonicalized path like {@code + * "tables/ns.tbl"}), not from a generated internal entity id. This ensures the binding is * available even when an operation fails before creating any entities, and allows the HTTP layer * to detect idempotency-key reuse across different resources. */ @@ -111,8 +111,8 @@ public Integer getHttpStatus() { /** * Optional error subtype or code that provides additional detail when the operation failed. * - *

Examples include {@code already_exists}, {@code namespace_not_empty}, or - * {@code idempotency_replay_failed}. + *

Examples include {@code already_exists}, {@code namespace_not_empty}, or {@code + * idempotency_replay_failed}. */ public String getErrorSubtype() { return errorSubtype; @@ -132,8 +132,8 @@ public String getResponseSummary() { /** * Serialized representation of a small, whitelisted set of HTTP response headers. * - *

Stored as a JSON string so that the HTTP layer can replay key headers (such as - * {@code Content-Type}) when serving a duplicate idempotent request. + *

Stored as a JSON string so that the HTTP layer can replay key headers (such as {@code + * Content-Type}) when serving a duplicate idempotent request. */ public String getResponseHeaders() { return responseHeaders; From d2ef040cb994ed54e60833faa81627f016fcbceb Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 31 Jan 2026 15:10:27 -0800 Subject: [PATCH 3/8] extend QueryGenerator for idempotency queries --- .../relational/jdbc/QueryGenerator.java | 130 +++++++++++++++++- .../RelationalJdbcIdempotencyStore.java | 84 ++++++----- 2 files changed, 176 insertions(+), 38 deletions(-) diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index f690b5ef79..d5c3fc5ce7 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -201,6 +201,65 @@ public static PreparedQuery generateUpdateQuery( return new PreparedQuery(sql, bindingParams); } + /** + * Builds an UPDATE query that updates only the specified columns and supports richer WHERE + * predicates (equality, greater-than, less-than, IS NULL, IS NOT NULL). + * + *

Callers should prefer passing an ordered map (e.g. {@link java.util.LinkedHashMap}) for the + * set clause so generated SQL and parameter order are stable. + * + * @param tableColumns List of valid table columns. + * @param tableName Target table. + * @param setClause Column-value pairs to update. + * @param whereEquals Column-value pairs used in WHERE equality filtering. + * @param whereGreater Column-value pairs used in WHERE greater-than filtering. + * @param whereLess Column-value pairs used in WHERE less-than filtering. + * @param whereIsNull Columns that must be NULL. + * @param whereIsNotNull Columns that must be NOT NULL. + * @return UPDATE query with parameter bindings. + */ + public static PreparedQuery generateUpdateQuery( + @Nonnull List tableColumns, + @Nonnull String tableName, + @Nonnull Map setClause, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater, + @Nonnull Map whereLess, + @Nonnull Set whereIsNull, + @Nonnull Set whereIsNotNull) { + if (setClause.isEmpty()) { + throw new IllegalArgumentException("Empty setClause"); + } + + Set columns = new HashSet<>(tableColumns); + validateColumns(columns, setClause.keySet()); + validateColumns(columns, whereEquals.keySet()); + validateColumns(columns, whereGreater.keySet()); + validateColumns(columns, whereLess.keySet()); + validateColumns(columns, whereIsNull); + validateColumns(columns, whereIsNotNull); + + QueryFragment where = + generateWhereClauseExtended( + columns, whereEquals, whereGreater, whereLess, whereIsNull, whereIsNotNull); + + List setParts = new ArrayList<>(); + List params = new ArrayList<>(); + for (Map.Entry entry : setClause.entrySet()) { + setParts.add(entry.getKey() + " = ?"); + params.add(entry.getValue()); + } + params.addAll(where.parameters()); + + String sql = + "UPDATE " + + getFullyQualifiedTableName(tableName) + + " SET " + + String.join(", ", setParts) + + where.sql(); + return new PreparedQuery(sql, params); + } + /** * Builds a DELETE query with the given conditions. * @@ -218,6 +277,32 @@ public static PreparedQuery generateDeleteQuery( "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters()); } + /** + * Builds a DELETE query that supports richer WHERE predicates (equality, greater-than, less-than, + * IS NULL, IS NOT NULL). + */ + public static PreparedQuery generateDeleteQuery( + @Nonnull List tableColumns, + @Nonnull String tableName, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater, + @Nonnull Map whereLess, + @Nonnull Set whereIsNull, + @Nonnull Set whereIsNotNull) { + Set columns = new HashSet<>(tableColumns); + validateColumns(columns, whereEquals.keySet()); + validateColumns(columns, whereGreater.keySet()); + validateColumns(columns, whereLess.keySet()); + validateColumns(columns, whereIsNull); + validateColumns(columns, whereIsNotNull); + + QueryFragment where = + generateWhereClauseExtended( + columns, whereEquals, whereGreater, whereLess, whereIsNull, whereIsNotNull); + return new PreparedQuery( + "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters()); + } + private static PreparedQuery generateSelectQuery( @Nonnull List columnNames, @Nonnull String tableName, @@ -240,22 +325,55 @@ static QueryFragment generateWhereClause( @Nonnull Set tableColumns, @Nonnull Map whereEquals, @Nonnull Map whereGreater) { + return generateWhereClauseExtended( + tableColumns, whereEquals, whereGreater, Map.of(), Set.of(), Set.of()); + } + + private static void validateColumns( + @Nonnull Set tableColumns, @Nonnull Set columns) { + for (String column : columns) { + if (!tableColumns.contains(column) && !column.equals("realm_id")) { + throw new IllegalArgumentException("Invalid query column: " + column); + } + } + } + + @VisibleForTesting + static QueryFragment generateWhereClauseExtended( + @Nonnull Set tableColumns, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater, + @Nonnull Map whereLess, + @Nonnull Set whereIsNull, + @Nonnull Set whereIsNotNull) { + // Preserve the original behavior of rejecting unknown columns. This is used by SELECT query + // generation too, not only by callers of the extended UPDATE/DELETE helpers. + validateColumns(tableColumns, whereEquals.keySet()); + validateColumns(tableColumns, whereGreater.keySet()); + validateColumns(tableColumns, whereLess.keySet()); + validateColumns(tableColumns, whereIsNull); + validateColumns(tableColumns, whereIsNotNull); + List conditions = new ArrayList<>(); List parameters = new ArrayList<>(); for (Map.Entry entry : whereEquals.entrySet()) { - if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { - throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); - } conditions.add(entry.getKey() + " = ?"); parameters.add(entry.getValue()); } for (Map.Entry entry : whereGreater.entrySet()) { - if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { - throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); - } conditions.add(entry.getKey() + " > ?"); parameters.add(entry.getValue()); } + for (Map.Entry entry : whereLess.entrySet()) { + conditions.add(entry.getKey() + " < ?"); + parameters.add(entry.getValue()); + } + for (String column : whereIsNull) { + conditions.add(column + " IS NULL"); + } + for (String column : whereIsNotNull) { + conditions.add(column + " IS NOT NULL"); + } String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); return new QueryFragment(clause, parameters); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java index 1d86f5641f..b865baccce 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java @@ -20,10 +20,12 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; -import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import javax.sql.DataSource; import org.apache.polaris.core.entity.IdempotencyRecord; import org.apache.polaris.core.persistence.IdempotencyStore; @@ -33,7 +35,6 @@ import org.apache.polaris.persistence.relational.jdbc.models.ImmutableModelIdempotencyRecord; import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord; -/** {@link IdempotencyStore} backed by the relational-jdbc schema. */ public class RelationalJdbcIdempotencyStore implements IdempotencyStore { private final DatasourceOperations datasourceOperations; @@ -131,15 +132,26 @@ public HeartbeatResult updateHeartbeat( return HeartbeatResult.LOST_OWNERSHIP; } - String sql = - "UPDATE " - + QueryGenerator.fullyQualifiedTableName(ModelIdempotencyRecord.TABLE_NAME) - + " SET heartbeat_at = ?, updated_at = ?" - + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL AND executor_id = ?"; QueryGenerator.PreparedQuery update = - new QueryGenerator.PreparedQuery( - sql, - List.of(Timestamp.from(now), Timestamp.from(now), realmId, idempotencyKey, executorId)); + QueryGenerator.generateUpdateQuery( + ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.TABLE_NAME, + Map.of( + ModelIdempotencyRecord.HEARTBEAT_AT, + Timestamp.from(now), + ModelIdempotencyRecord.UPDATED_AT, + Timestamp.from(now)), + Map.of( + ModelIdempotencyRecord.REALM_ID, + realmId, + ModelIdempotencyRecord.IDEMPOTENCY_KEY, + idempotencyKey, + ModelIdempotencyRecord.EXECUTOR_ID, + executorId), + Map.of(), + Map.of(), + Set.of(ModelIdempotencyRecord.HTTP_STATUS), + Set.of()); try { int updated = datasourceOperations.executeUpdate(update); @@ -170,24 +182,29 @@ public boolean finalizeRecord( String responseSummary, String responseHeaders, Instant finalizedAt) { - String sql = - "UPDATE " - + QueryGenerator.fullyQualifiedTableName(ModelIdempotencyRecord.TABLE_NAME) - + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," - + " finalized_at = ?, updated_at = ?" - + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; + // Use ordered/set maps so we can include nullable values (Map.of disallows nulls). + Map setClause = new LinkedHashMap<>(); + setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus); + setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype); + setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary); + setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders); + setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt)); + setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt)); + + Map whereEquals = new HashMap<>(); + whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId); + whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey); + QueryGenerator.PreparedQuery update = - new QueryGenerator.PreparedQuery( - sql, - Arrays.asList( - httpStatus, - errorSubtype, - responseSummary, - responseHeaders, - Timestamp.from(finalizedAt), - Timestamp.from(finalizedAt), - realmId, - idempotencyKey)); + QueryGenerator.generateUpdateQuery( + ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.TABLE_NAME, + setClause, + whereEquals, + Map.of(), + Map.of(), + Set.of(ModelIdempotencyRecord.HTTP_STATUS), + Set.of()); try { return datasourceOperations.executeUpdate(update) > 0; @@ -199,12 +216,15 @@ public boolean finalizeRecord( @Override public int purgeExpired(String realmId, Instant before) { try { - String sql = - "DELETE FROM " - + QueryGenerator.fullyQualifiedTableName(ModelIdempotencyRecord.TABLE_NAME) - + " WHERE realm_id = ? AND expires_at IS NOT NULL AND expires_at < ?"; QueryGenerator.PreparedQuery delete = - new QueryGenerator.PreparedQuery(sql, List.of(realmId, Timestamp.from(before))); + QueryGenerator.generateDeleteQuery( + ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.TABLE_NAME, + Map.of(ModelIdempotencyRecord.REALM_ID, realmId), + Map.of(), + Map.of(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(before)), + Set.of(), + Set.of(ModelIdempotencyRecord.EXPIRES_AT)); return datasourceOperations.executeUpdate(delete); } catch (SQLException e) { throw new RuntimeException("Failed to purge expired idempotency records", e); From fc2ac6175375dbdfac495c5a6ebf342edef0fce3 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 31 Jan 2026 16:20:38 -0800 Subject: [PATCH 4/8] add PG profile --- persistence/relational-jdbc/build.gradle.kts | 3 +++ .../RelationalJdbcIdempotencyStorePostgresIT.java | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/persistence/relational-jdbc/build.gradle.kts b/persistence/relational-jdbc/build.gradle.kts index c3e4253727..10cb2e8fe5 100644 --- a/persistence/relational-jdbc/build.gradle.kts +++ b/persistence/relational-jdbc/build.gradle.kts @@ -47,4 +47,7 @@ dependencies { testImplementation("org.testcontainers:testcontainers-junit-jupiter") testImplementation("org.testcontainers:testcontainers-postgresql") + + testImplementation(project(":polaris-container-spec-helper")) + testImplementation(project(":polaris-runtime-test-common")) } diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index 7af6659d47..fa56c7231e 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -17,6 +17,7 @@ package org.apache.polaris.persistence.relational.jdbc.idempotency; import static org.assertj.core.api.Assertions.assertThat; +import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper; import java.io.InputStream; import java.time.Duration; @@ -28,6 +29,7 @@ import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.test.commons.PostgresRelationalJdbcLifeCycleManagement; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,7 +43,13 @@ public class RelationalJdbcIdempotencyStorePostgresIT { @Container private static final PostgreSQLContainer POSTGRES = - new PostgreSQLContainer<>("postgres:17.5-alpine"); + new PostgreSQLContainer<>( + containerSpecHelper("postgres", PostgresRelationalJdbcLifeCycleManagement.class) + .dockerImageName(null) + .asCompatibleSubstituteFor("postgres")) + .withDatabaseName("polaris_db") + .withUsername("polaris") + .withPassword("polaris"); private static DataSource dataSource; private static RelationalJdbcIdempotencyStore store; From a3427fcdda1082e499b3a09d2d9f0545c66eb33b Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 31 Jan 2026 16:56:21 -0800 Subject: [PATCH 5/8] spotlessApply --- .../idempotency/RelationalJdbcIdempotencyStorePostgresIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index fa56c7231e..d0601c5622 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -16,8 +16,8 @@ */ package org.apache.polaris.persistence.relational.jdbc.idempotency; -import static org.assertj.core.api.Assertions.assertThat; import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper; +import static org.assertj.core.api.Assertions.assertThat; import java.io.InputStream; import java.time.Duration; From 323484b37bf1544029e7898fc20eb8590c538177 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 3 Feb 2026 15:00:47 -0800 Subject: [PATCH 6/8] address comments --- .../relational/jdbc/QueryGenerator.java | 9 -- .../RelationalJdbcIdempotencyStore.java | 11 +- .../jdbc/models/ModelIdempotencyRecord.java | 48 +++++-- .../relational/jdbc/QueryGeneratorTest.java | 120 ++++++++++++++++++ ...ationalJdbcIdempotencyStorePostgresIT.java | 9 +- .../IdempotencyPersistenceException.java | 37 ++++++ 6 files changed, 202 insertions(+), 32 deletions(-) create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index d5c3fc5ce7..eecae59a67 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -50,15 +50,6 @@ public record PreparedBatchQuery(String sql, List> parametersList) /** A container for the query fragment SQL string and the ordered parameter values. */ record QueryFragment(String sql, List parameters) {} - /** - * Returns the fully-qualified table name used by relational-jdbc queries. - * - * @param tableName Target table name. - */ - public static String fullyQualifiedTableName(@Nonnull String tableName) { - return getFullyQualifiedTableName(tableName); - } - /** * Generates a SELECT query with projection and filtering. * diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java index b865baccce..4f26b274a8 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java @@ -28,6 +28,7 @@ import java.util.Set; import javax.sql.DataSource; import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyPersistenceException; import org.apache.polaris.core.persistence.IdempotencyStore; import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; @@ -82,7 +83,7 @@ public ReserveResult reserve( if (datasourceOperations.isConstraintViolation(e)) { return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); } - throw new RuntimeException("Failed to reserve idempotency key", e); + throw new IdempotencyPersistenceException("Failed to reserve idempotency key", e); } } @@ -112,7 +113,7 @@ public Optional load(String realmId, String idempotencyKey) { } return Optional.of(results.getFirst()); } catch (SQLException e) { - throw new RuntimeException("Failed to load idempotency record", e); + throw new IdempotencyPersistenceException("Failed to load idempotency record", e); } } @@ -159,7 +160,7 @@ public HeartbeatResult updateHeartbeat( return HeartbeatResult.UPDATED; } } catch (SQLException e) { - throw new RuntimeException("Failed to update idempotency heartbeat", e); + throw new IdempotencyPersistenceException("Failed to update idempotency heartbeat", e); } // Raced with finalize/ownership loss; re-check to return a meaningful result. @@ -209,7 +210,7 @@ public boolean finalizeRecord( try { return datasourceOperations.executeUpdate(update) > 0; } catch (SQLException e) { - throw new RuntimeException("Failed to finalize idempotency record", e); + throw new IdempotencyPersistenceException("Failed to finalize idempotency record", e); } } @@ -227,7 +228,7 @@ public int purgeExpired(String realmId, Instant before) { Set.of(ModelIdempotencyRecord.EXPIRES_AT)); return datasourceOperations.executeUpdate(delete); } catch (SQLException e) { - throw new RuntimeException("Failed to purge expired idempotency records", e); + throw new IdempotencyPersistenceException("Failed to purge expired idempotency records", e); } } } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java index a6a9e90d5f..5bf567488b 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java @@ -41,21 +41,35 @@ public interface ModelIdempotencyRecord extends Converter { String TABLE_NAME = "idempotency_records"; + // Logical tenant / realm identifier. String REALM_ID = "realm_id"; + // Client-provided idempotency key. String IDEMPOTENCY_KEY = "idempotency_key"; + // Logical operation type (e.g. commit-table). String OPERATION_TYPE = "operation_type"; + // Normalized identifier of the affected resource. String RESOURCE_ID = "resource_id"; + // Final HTTP status code once the operation is completed (null while in-progress). String HTTP_STATUS = "http_status"; + // Optional error subtype for failures. String ERROR_SUBTYPE = "error_subtype"; + // Short serialized representation of the response body. String RESPONSE_SUMMARY = "response_summary"; + // Serialized representation of response headers. String RESPONSE_HEADERS = "response_headers"; + // Timestamp when the operation was finalized (null while in-progress). String FINALIZED_AT = "finalized_at"; + // Timestamp when the record was created. String CREATED_AT = "created_at"; + // Timestamp when the record was last updated. String UPDATED_AT = "updated_at"; + // Timestamp for the last heartbeat update (null if no heartbeat recorded). String HEARTBEAT_AT = "heartbeat_at"; + // Identifier of the executor that owns the in-progress record (null if not owned). String EXECUTOR_ID = "executor_id"; + // Expiration timestamp after which the record can be considered stale/purgeable. String EXPIRES_AT = "expires_at"; List ALL_COLUMNS = @@ -99,20 +113,25 @@ public interface ModelIdempotencyRecord extends Converter { EXPIRES_AT); /** - * Dummy instance to be used as a Converter when calling {@link #fromResultSet(ResultSet)}. + * Stateless {@link Converter} instance for {@link + * org.apache.polaris.persistence.relational.jdbc.DatasourceOperations#executeSelect}. * - *

FIXME: fromResultSet() is a factory method and should be static or moved to a factory class. + *

We only need {@link Converter#fromResultSet(ResultSet)} when selecting rows; {@link + * Converter#toMap(DatabaseType)} is not used in that code path. */ - ModelIdempotencyRecord CONVERTER = - ImmutableModelIdempotencyRecord.builder() - .realmId("") - .idempotencyKey("") - .operationType("") - .resourceId("") - .createdAt(Instant.EPOCH) - .updatedAt(Instant.EPOCH) - .expiresAt(Instant.EPOCH) - .build(); + Converter CONVERTER = + new Converter<>() { + @Override + public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return ModelIdempotencyRecord.fromRow(rs); + } + + @Override + public Map toMap(DatabaseType databaseType) { + throw new UnsupportedOperationException( + "ModelIdempotencyRecord.CONVERTER is only intended for result-set conversion"); + } + }; String getRealmId(); @@ -151,6 +170,11 @@ public interface ModelIdempotencyRecord extends Converter { @Override default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return fromRow(rs); + } + + /** Convert the current ResultSet row into an {@link IdempotencyRecord}. */ + static IdempotencyRecord fromRow(ResultSet rs) throws SQLException { String realmId = rs.getString(REALM_ID); String idempotencyKey = rs.getString(IDEMPOTENCY_KEY); String operationType = rs.getString(OPERATION_TYPE); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java index d31b077889..9e0a7e6e79 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java @@ -26,6 +26,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -220,6 +222,124 @@ void testGenerateWhereClause_emptyMap() { assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause, Map.of()).sql()); } + @Test + void testGenerateWhereClauseExtended_allPredicatesAndStableParameterOrder() { + Map whereEquals = new LinkedHashMap<>(); + whereEquals.put("a", "A"); + Map whereGreater = new LinkedHashMap<>(); + whereGreater.put("b", 2); + Map whereLess = new LinkedHashMap<>(); + whereLess.put("c", 3); + + Set whereIsNull = new LinkedHashSet<>(List.of("d")); + Set whereIsNotNull = new LinkedHashSet<>(List.of("e")); + + QueryGenerator.QueryFragment where = + QueryGenerator.generateWhereClauseExtended( + Set.of("a", "b", "c", "d", "e"), + whereEquals, + whereGreater, + whereLess, + whereIsNull, + whereIsNotNull); + + assertEquals(" WHERE a = ? AND b > ? AND c < ? AND d IS NULL AND e IS NOT NULL", where.sql()); + Assertions.assertThat(where.parameters()).containsExactly("A", 2, 3); + } + + @Test + void testGenerateUpdateQueryExtended_supportsNullSetValues() { + Map setClause = new LinkedHashMap<>(); + setClause.put("error_subtype", null); + setClause.put("http_status", 200); + + QueryGenerator.PreparedQuery q = + QueryGenerator.generateUpdateQuery( + List.of("error_subtype", "http_status", "realm_id", "idempotency_key", "executor_id"), + "idempotency_records", + setClause, + Map.of("realm_id", "r1", "idempotency_key", "k1"), + Map.of(), + Map.of("http_status", 500), + Set.of("executor_id"), + Set.of()); + + assertEquals( + "UPDATE POLARIS_SCHEMA.idempotency_records SET error_subtype = ?, http_status = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status < ? AND executor_id IS NULL", + q.sql()); + Assertions.assertThat(q.parameters()).containsExactly(null, 200, "r1", "k1", 500); + } + + @Test + void testGenerateUpdateQueryExtended_rejectsEmptySetClause() { + assertThrows( + IllegalArgumentException.class, + () -> + QueryGenerator.generateUpdateQuery( + List.of("a"), + "t", + Map.of(), + Map.of("a", 1), + Map.of(), + Map.of(), + Set.of(), + Set.of())); + } + + @Test + void testGenerateDeleteQueryExtended_includesNullPredicatesAndLessThan() { + QueryGenerator.PreparedQuery q = + QueryGenerator.generateDeleteQuery( + List.of("realm_id", "expires_at", "finalized_at"), + "idempotency_records", + Map.of("realm_id", "r1"), + Map.of(), + Map.of("expires_at", 123), + Set.of("finalized_at"), + Set.of()); + + assertEquals( + "DELETE FROM POLARIS_SCHEMA.idempotency_records WHERE realm_id = ? AND expires_at < ? AND finalized_at IS NULL", + q.sql()); + Assertions.assertThat(q.parameters()).containsExactly("r1", 123); + } + + @Test + void testGenerateDeleteQueryExtended_allowsRealmIdEvenIfNotInTableColumns() { + QueryGenerator.PreparedQuery q = + QueryGenerator.generateDeleteQuery( + List.of("id"), + "some_table", + Map.of("realm_id", "r1"), + Map.of(), + Map.of(), + Set.of(), + Set.of()); + + assertEquals("DELETE FROM POLARIS_SCHEMA.some_table WHERE realm_id = ?", q.sql()); + Assertions.assertThat(q.parameters()).containsExactly("r1"); + } + + @Test + void testGenerateUpdateQueryExtended_rejectsInvalidColumns() { + Map setClause = new LinkedHashMap<>(); + setClause.put("not_a_real_column", 1); + + assertThrows( + IllegalArgumentException.class, + () -> + QueryGenerator.generateUpdateQuery( + List.of("a"), + "t", + setClause, + Map.of("a", 1), + Map.of(), + Map.of(), + Set.of(), + Set.of())); + } + @Test void testGenerateOverlapQuery() { assertEquals( diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index d0601c5622..be375c44d0 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -44,12 +44,9 @@ public class RelationalJdbcIdempotencyStorePostgresIT { @Container private static final PostgreSQLContainer POSTGRES = new PostgreSQLContainer<>( - containerSpecHelper("postgres", PostgresRelationalJdbcLifeCycleManagement.class) - .dockerImageName(null) - .asCompatibleSubstituteFor("postgres")) - .withDatabaseName("polaris_db") - .withUsername("polaris") - .withPassword("polaris"); + containerSpecHelper("postgres", PostgresRelationalJdbcLifeCycleManagement.class) + .dockerImageName(null) + .asCompatibleSubstituteFor("postgres")); private static DataSource dataSource; private static RelationalJdbcIdempotencyStore store; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java new file mode 100644 index 0000000000..b22e2a7614 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence; + +import org.apache.polaris.core.exceptions.PolarisException; + +/** + * Raised when the {@link IdempotencyStore} fails due to an underlying persistence/storage error. + * + *

This is treated as an internal server error by {@code PolarisExceptionMapper}. + */ +public class IdempotencyPersistenceException extends PolarisException { + + public IdempotencyPersistenceException(String message) { + super(message); + } + + public IdempotencyPersistenceException(String message, Throwable cause) { + super(message, cause); + } +} From feceb776c9a323ff8469c7a18842c2e686fded32 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 6 Feb 2026 15:38:51 -0800 Subject: [PATCH 7/8] address comments --- .../relational/jdbc/DatasourceOperations.java | 5 -- .../relational/jdbc/QueryGenerator.java | 15 +---- .../RelationalJdbcIdempotencyStore.java | 58 +++++++++++------- .../jdbc/models/ModelIdempotencyRecord.java | 60 ++++--------------- ...ationalJdbcIdempotencyStorePostgresIT.java | 10 ++-- .../core/entity/IdempotencyRecord.java | 43 +++++++++++-- 6 files changed, 94 insertions(+), 97 deletions(-) diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java index 6fed5b7e45..e44de3a94c 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java @@ -83,11 +83,6 @@ DatabaseType getDatabaseType() { return databaseType; } - /** Returns the detected database type for this datasource. */ - public DatabaseType databaseType() { - return databaseType; - } - /** * Execute SQL script and close the associated input stream * diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index eecae59a67..a8720fda5b 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -197,7 +197,7 @@ public static PreparedQuery generateUpdateQuery( * predicates (equality, greater-than, less-than, IS NULL, IS NOT NULL). * *

Callers should prefer passing an ordered map (e.g. {@link java.util.LinkedHashMap}) for the - * set clause so generated SQL and parameter order are stable. + * set clause so generated SQL and parameter order are consistent. * * @param tableColumns List of valid table columns. * @param tableName Target table. @@ -224,11 +224,6 @@ public static PreparedQuery generateUpdateQuery( Set columns = new HashSet<>(tableColumns); validateColumns(columns, setClause.keySet()); - validateColumns(columns, whereEquals.keySet()); - validateColumns(columns, whereGreater.keySet()); - validateColumns(columns, whereLess.keySet()); - validateColumns(columns, whereIsNull); - validateColumns(columns, whereIsNotNull); QueryFragment where = generateWhereClauseExtended( @@ -281,12 +276,6 @@ public static PreparedQuery generateDeleteQuery( @Nonnull Set whereIsNull, @Nonnull Set whereIsNotNull) { Set columns = new HashSet<>(tableColumns); - validateColumns(columns, whereEquals.keySet()); - validateColumns(columns, whereGreater.keySet()); - validateColumns(columns, whereLess.keySet()); - validateColumns(columns, whereIsNull); - validateColumns(columns, whereIsNotNull); - QueryFragment where = generateWhereClauseExtended( columns, whereEquals, whereGreater, whereLess, whereIsNull, whereIsNotNull); @@ -337,8 +326,6 @@ static QueryFragment generateWhereClauseExtended( @Nonnull Map whereLess, @Nonnull Set whereIsNull, @Nonnull Set whereIsNotNull) { - // Preserve the original behavior of rejecting unknown columns. This is used by SELECT query - // generation too, not only by callers of the extended UPDATE/DELETE helpers. validateColumns(tableColumns, whereEquals.keySet()); validateColumns(tableColumns, whereGreater.keySet()); validateColumns(tableColumns, whereLess.keySet()); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java index 4f26b274a8..6f3143aef0 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java @@ -17,6 +17,7 @@ package org.apache.polaris.persistence.relational.jdbc.idempotency; import jakarta.annotation.Nonnull; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; @@ -33,7 +34,7 @@ import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; -import org.apache.polaris.persistence.relational.jdbc.models.ImmutableModelIdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord; public class RelationalJdbcIdempotencyStore implements IdempotencyStore { @@ -56,21 +57,23 @@ public ReserveResult reserve( String executorId, Instant now) { try { - ModelIdempotencyRecord model = - ImmutableModelIdempotencyRecord.builder() - .realmId(realmId) - .idempotencyKey(idempotencyKey) - .operationType(operationType) - .resourceId(normalizedResourceId) - .createdAt(now) - .updatedAt(now) - .heartbeatAt(now) - .executorId(executorId) - .expiresAt(expiresAt) - .build(); - - List values = - model.toMap(datasourceOperations.databaseType()).values().stream().toList(); + // Build insert values directly to avoid requiring an Immutables-generated model type. + Map insertMap = new LinkedHashMap<>(); + insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey); + insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType); + insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId); + insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null); + insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null); + insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null); + insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null); + insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null); + insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now)); + insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now)); + insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now)); + insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId); + insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt)); + + List values = insertMap.values().stream().toList(); QueryGenerator.PreparedQuery insert = QueryGenerator.generateInsertQuery( ModelIdempotencyRecord.ALL_COLUMNS, @@ -92,7 +95,7 @@ public Optional load(String realmId, String idempotencyKey) { try { QueryGenerator.PreparedQuery query = QueryGenerator.generateSelectQuery( - ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.ALL_COLUMNS, ModelIdempotencyRecord.TABLE_NAME, Map.of( ModelIdempotencyRecord.REALM_ID, @@ -100,7 +103,20 @@ public Optional load(String realmId, String idempotencyKey) { ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey)); List results = - datasourceOperations.executeSelect(query, ModelIdempotencyRecord.CONVERTER); + datasourceOperations.executeSelect( + query, + new Converter<>() { + @Override + public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return ModelIdempotencyRecord.fromRow(realmId, rs); + } + + @Override + public Map toMap( + org.apache.polaris.persistence.relational.jdbc.DatabaseType databaseType) { + throw new UnsupportedOperationException("Not used for SELECT conversion"); + } + }); if (results.isEmpty()) { return Optional.empty(); } @@ -135,7 +151,7 @@ public HeartbeatResult updateHeartbeat( QueryGenerator.PreparedQuery update = QueryGenerator.generateUpdateQuery( - ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.ALL_COLUMNS, ModelIdempotencyRecord.TABLE_NAME, Map.of( ModelIdempotencyRecord.HEARTBEAT_AT, @@ -198,7 +214,7 @@ public boolean finalizeRecord( QueryGenerator.PreparedQuery update = QueryGenerator.generateUpdateQuery( - ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.ALL_COLUMNS, ModelIdempotencyRecord.TABLE_NAME, setClause, whereEquals, @@ -219,7 +235,7 @@ public int purgeExpired(String realmId, Instant before) { try { QueryGenerator.PreparedQuery delete = QueryGenerator.generateDeleteQuery( - ModelIdempotencyRecord.SELECT_COLUMNS, + ModelIdempotencyRecord.ALL_COLUMNS, ModelIdempotencyRecord.TABLE_NAME, Map.of(ModelIdempotencyRecord.REALM_ID, realmId), Map.of(), diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java index 5bf567488b..74473cc246 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.polaris.core.entity.IdempotencyRecord; -import org.apache.polaris.immutables.PolarisImmutable; import org.apache.polaris.persistence.relational.jdbc.DatabaseType; /** @@ -35,8 +34,10 @@ * *

This follows the same pattern as {@link ModelEvent}, separating the storage representation * from the core domain model while still providing {@link Converter} helpers. + * + *

Note: {@code realm_id} is treated as an implicit column across relational-jdbc: callers can + * filter on it in WHERE clauses even if it is not included in the projection list. */ -@PolarisImmutable public interface ModelIdempotencyRecord extends Converter { String TABLE_NAME = "idempotency_records"; @@ -88,51 +89,6 @@ public interface ModelIdempotencyRecord extends Converter { EXECUTOR_ID, EXPIRES_AT); - /** - * Columns to select when reading idempotency records. - * - *

{@code realm_id} is intentionally not part of {@link #ALL_COLUMNS} because {@link - * org.apache.polaris.persistence.relational.jdbc.QueryGenerator#generateInsertQuery(List, String, - * List, String)} appends it automatically for all relational-jdbc tables. - */ - List SELECT_COLUMNS = - List.of( - REALM_ID, - IDEMPOTENCY_KEY, - OPERATION_TYPE, - RESOURCE_ID, - HTTP_STATUS, - ERROR_SUBTYPE, - RESPONSE_SUMMARY, - RESPONSE_HEADERS, - FINALIZED_AT, - CREATED_AT, - UPDATED_AT, - HEARTBEAT_AT, - EXECUTOR_ID, - EXPIRES_AT); - - /** - * Stateless {@link Converter} instance for {@link - * org.apache.polaris.persistence.relational.jdbc.DatasourceOperations#executeSelect}. - * - *

We only need {@link Converter#fromResultSet(ResultSet)} when selecting rows; {@link - * Converter#toMap(DatabaseType)} is not used in that code path. - */ - Converter CONVERTER = - new Converter<>() { - @Override - public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { - return ModelIdempotencyRecord.fromRow(rs); - } - - @Override - public Map toMap(DatabaseType databaseType) { - throw new UnsupportedOperationException( - "ModelIdempotencyRecord.CONVERTER is only intended for result-set conversion"); - } - }; - String getRealmId(); String getIdempotencyKey(); @@ -175,7 +131,15 @@ default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { /** Convert the current ResultSet row into an {@link IdempotencyRecord}. */ static IdempotencyRecord fromRow(ResultSet rs) throws SQLException { - String realmId = rs.getString(REALM_ID); + // Requires realm_id to be projected in the ResultSet. + return fromRow(rs.getString(REALM_ID), rs); + } + + /** + * Convert the current ResultSet row into an {@link IdempotencyRecord}, using {@code realmId} from + * call context (so callers can project only {@link #ALL_COLUMNS}). + */ + static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws SQLException { String idempotencyKey = rs.getString(IDEMPOTENCY_KEY); String operationType = rs.getString(OPERATION_TYPE); String resourceId = rs.getString(RESOURCE_ID); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index be375c44d0..7a19e133ea 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -102,7 +102,7 @@ void reserveSingleWinnerAndDuplicate() { String realm = "test-realm"; String key = "K1"; String op = "commit-table"; - String rid = "tables/ns.tbl"; + String rid = "catalogs/1/tables/ns.tbl"; Instant now = Instant.now(); Instant exp = now.plus(Duration.ofMinutes(5)); @@ -125,7 +125,7 @@ void heartbeatAndFinalize() { String realm = "test-realm"; String key = "K2"; String op = "commit-table"; - String rid = "tables/ns.tbl2"; + String rid = "catalogs/1/tables/ns.tbl2"; Instant now = Instant.now(); Instant exp = now.plus(Duration.ofMinutes(5)); @@ -167,7 +167,7 @@ void purgeExpired() { String realm = "test-realm"; String key = "K3"; String op = "drop-table"; - String rid = "tables/ns.tbl3"; + String rid = "catalogs/1/tables/ns.tbl3"; Instant now = Instant.now(); Instant expPast = now.minus(Duration.ofMinutes(1)); @@ -181,9 +181,9 @@ void duplicateReturnsExistingBindingForMismatch() { String realm = "test-realm"; String key = "K4"; String op1 = "commit-table"; - String rid1 = "tables/ns.tbl4"; + String rid1 = "catalogs/1/tables/ns.tbl4"; String op2 = "drop-table"; // different binding - String rid2 = "tables/ns.tbl4"; // same resource, different op + String rid2 = "catalogs/1/tables/ns.tbl4"; // same resource, different op Instant now = Instant.now(); Instant exp = now.plus(Duration.ofMinutes(5)); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java index 9583a15e8e..9def89b157 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java @@ -26,21 +26,53 @@ */ public final class IdempotencyRecord { + /** Logical tenant / realm identifier. */ private final String realmId; + + /** Client-provided idempotency key. */ private final String idempotencyKey; + + /** Logical operation type (e.g. {@code "commit-table"}). */ private final String operationType; + + /** + * Request-derived, fully-qualified identifier of the affected resource (see {@link + * #getNormalizedResourceId()}). + */ private final String normalizedResourceId; + /** HTTP status code returned to the client once finalized; {@code null} while in-progress. */ private final Integer httpStatus; + + /** Optional error subtype/code when the operation failed. */ private final String errorSubtype; + + /** Minimal serialized representation of the response body for replay. */ private final String responseSummary; + + /** Serialized representation of a small, whitelisted set of response headers for replay. */ private final String responseHeaders; + + /** Timestamp when the operation was finalized; {@code null} while in-progress. */ private final Instant finalizedAt; + /** Timestamp when the record was created. */ private final Instant createdAt; + + /** Timestamp when the record was last updated. */ private final Instant updatedAt; + + /** + * Timestamp of the most recent heartbeat while in-progress; {@code null} if never heartbeated. + */ private final Instant heartbeatAt; + + /** + * Identifier of the executor that owns the in-progress reservation; {@code null} if not owned. + */ private final String executorId; + + /** Timestamp after which the reservation is considered expired and eligible for purging. */ private final Instant expiresAt; public IdempotencyRecord( @@ -89,10 +121,13 @@ public String getOperationType() { /** * Normalized identifier of the resource affected by the operation. * - *

This should be derived from the request (for example, a canonicalized path like {@code - * "tables/ns.tbl"}), not from a generated internal entity id. This ensures the binding is - * available even when an operation fails before creating any entities, and allows the HTTP layer - * to detect idempotency-key reuse across different resources. + *

This should be derived from the request (for example, a canonicalized and fully-qualified + * identifier like {@code "catalogs//tables/ns.tbl"}), not from a generated internal + * entity id. + * + *

The identifier must be stable even on failure (before any entities are created) and must be + * scoped to avoid false conflicts (for example, include the catalog/warehouse identifier when + * applicable). */ public String getNormalizedResourceId() { return normalizedResourceId; From 6d20fa39c8faca5f6e548de6570e451dded13df1 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 6 Feb 2026 18:38:37 -0800 Subject: [PATCH 8/8] fix test failure --- .../relational/jdbc/QueryGeneratorTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java index 9e0a7e6e79..38a06f2b5f 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java @@ -253,14 +253,21 @@ void testGenerateUpdateQueryExtended_supportsNullSetValues() { setClause.put("error_subtype", null); setClause.put("http_status", 200); + // Use ordered maps so WHERE clause order is deterministic. + Map whereEquals = new LinkedHashMap<>(); + whereEquals.put("realm_id", "r1"); + whereEquals.put("idempotency_key", "k1"); + Map whereLess = new LinkedHashMap<>(); + whereLess.put("http_status", 500); + QueryGenerator.PreparedQuery q = QueryGenerator.generateUpdateQuery( List.of("error_subtype", "http_status", "realm_id", "idempotency_key", "executor_id"), "idempotency_records", setClause, - Map.of("realm_id", "r1", "idempotency_key", "k1"), + whereEquals, Map.of(), - Map.of("http_status", 500), + whereLess, Set.of("executor_id"), Set.of());