diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java index e44de3a94c..3689c9278a 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java @@ -142,6 +142,56 @@ public List executeSelect( return results; } + /** + * Executes SELECT Query using an existing connection (for use within transactions). Returns the + * results after applying a transformer. + * + * @param query : Query to execute + * @param converterInstance : An instance of the type being selected, used to convert to a + * business entity like PolarisBaseEntity + * @param connection : The connection to use for the query + * @return The list of results yielded by the query + * @param : Business entity class + * @throws SQLException : Exception during the query execution. + */ + public List executeSelect( + @Nonnull QueryGenerator.PreparedQuery query, + @Nonnull Converter converterInstance, + @Nonnull Connection connection) + throws SQLException { + logQuery(query); + return executeSelectWithConnection(query, converterInstance, connection); + } + + /** + * Helper method to execute a SELECT query with a given connection and collect results. + * + * @param query : Query to execute + * @param converterInstance : Converter to transform ResultSet rows + * @param connection : Connection to use (must not be closed by this method) + * @return List of converted results + * @param : Business entity class + * @throws SQLException : Exception during query execution + */ + private List executeSelectWithConnection( + @Nonnull QueryGenerator.PreparedQuery query, + @Nonnull Converter converterInstance, + @Nonnull Connection connection) + throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(query.sql())) { + List params = query.parameters(); + for (int i = 0; i < params.size(); i++) { + statement.setObject(i + 1, params.get(i)); + } + try (ResultSet resultSet = statement.executeQuery()) { + ResultSetIterator iterator = new ResultSetIterator<>(resultSet, converterInstance); + ArrayList results = new ArrayList<>(); + iterator.toStream().forEach(results::add); + return results; + } + } + } + /** * Executes SELECT Query and takes a consumer over the results. For callers that want more * sophisticated control over how query results are handled. @@ -186,16 +236,11 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE return withRetries( () -> { logQuery(preparedQuery); - try (Connection connection = borrowConnection(); - PreparedStatement statement = connection.prepareStatement(preparedQuery.sql())) { - List params = preparedQuery.parameters(); - for (int i = 0; i < params.size(); i++) { - statement.setObject(i + 1, params.get(i)); - } + try (Connection connection = borrowConnection()) { boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); try { - return statement.executeUpdate(); + return executeUpdateWithConnection(preparedQuery, connection); } finally { connection.setAutoCommit(autoCommit); } @@ -203,6 +248,25 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE }); } + /** + * Helper method to execute an UPDATE/INSERT query with a given connection. + * + * @param preparedQuery : Query to execute + * @param connection : Connection to use (must not be closed by this method) + * @return Number of rows modified + * @throws SQLException : Exception during query execution + */ + private int executeUpdateWithConnection( + QueryGenerator.PreparedQuery preparedQuery, Connection connection) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(preparedQuery.sql())) { + List params = preparedQuery.parameters(); + for (int i = 0; i < params.size(); i++) { + statement.setObject(i + 1, params.get(i)); + } + return statement.executeUpdate(); + } + } + /** * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL queries have the same * parameterized form. @@ -291,16 +355,18 @@ public void runWithinTransaction(TransactionCallback callback) throws SQLExcepti }); } + /** + * Executes UPDATE/INSERT query using an existing connection (for use within transactions). + * + * @param connection : Connection to use + * @param preparedQuery : Query to execute + * @return Number of rows modified + * @throws SQLException : Exception during query execution + */ public Integer execute(Connection connection, QueryGenerator.PreparedQuery preparedQuery) throws SQLException { logQuery(preparedQuery); - try (PreparedStatement statement = connection.prepareStatement(preparedQuery.sql())) { - List params = preparedQuery.parameters(); - for (int i = 0; i < params.size(); i++) { - statement.setObject(i + 1, params.get(i)); - } - return statement.executeUpdate(); - } + return executeUpdateWithConnection(preparedQuery, connection); } private boolean isRetryable(SQLException e) { diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 9401df2dd0..3b352f2079 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -119,13 +119,10 @@ public void writeEntity( boolean nameOrParentChanged, PolarisBaseEntity originalEntity) { try { - persistEntity( - callCtx, - entity, - originalEntity, - null, - (connection, preparedQuery) -> { - return datasourceOperations.executeUpdate(preparedQuery); + datasourceOperations.runWithinTransaction( + connection -> { + persistEntity(callCtx, entity, originalEntity, connection); + return true; }); } catch (SQLException e) { throw new RuntimeException("Error persisting entity", e); @@ -148,15 +145,18 @@ public void writeEntities( // return it. PolarisBaseEntity entityFound = lookupEntity( - callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode()); + callCtx, + entity.getCatalogId(), + entity.getId(), + entity.getTypeCode(), + connection); if (entityFound != null && originalEntity == null) { // probably the client retried, simply return it // TODO: Check correctness of returning entityFound vs entity here. It may have // already been updated after the creation. continue; } - persistEntity( - callCtx, entity, originalEntity, connection, datasourceOperations::execute); + persistEntity(callCtx, entity, originalEntity, connection); } return true; }); @@ -172,15 +172,29 @@ private void persistEntity( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity, PolarisBaseEntity originalEntity, - Connection connection, - QueryAction queryAction) + @Nonnull Connection connection) throws SQLException { ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion); if (originalEntity == null) { + // Check if entity already exists before attempting INSERT to avoid constraint violations + // that would abort the transaction in PostgreSQL + PolarisBaseEntity existingEntity = + lookupEntityByName( + callCtx, + entity.getCatalogId(), + entity.getParentId(), + entity.getTypeCode(), + entity.getName(), + connection); + + if (existingEntity != null) { + throw new EntityAlreadyExistsException(existingEntity); + } + try { List values = modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(); - queryAction.apply( + datasourceOperations.execute( connection, QueryGenerator.generateInsertQuery( ModelEntity.getAllColumnNames(schemaVersion), @@ -189,21 +203,10 @@ private void persistEntity( realmId)); } catch (SQLException e) { if (datasourceOperations.isConstraintViolation(e)) { - PolarisBaseEntity existingEntity = - lookupEntityByName( - callCtx, - entity.getCatalogId(), - entity.getParentId(), - entity.getTypeCode(), - entity.getName()); - // This happens in two scenarios: - // 1. PRIMARY KEY violated - // 2. UNIQUE CONSTRAINT on (realm_id, catalog_id, parent_id, type_code, name) violated - // With SERIALIZABLE isolation, the conflicting entity may _not_ be visible and - // existingEntity can be null, which would cause an NPE in - // EntityAlreadyExistsException.message(). - throw new EntityAlreadyExistsException( - existingEntity != null ? existingEntity : entity, e); + // Constraint violation despite our check above - this can happen due to race conditions + // where another transaction inserted the same entity between our SELECT and INSERT. + // We cannot lookup again here because PostgreSQL aborts the transaction after any error. + throw new EntityAlreadyExistsException(entity, e); } throw new RuntimeException( String.format("Failed to write entity due to %s", e.getMessage()), e); @@ -223,7 +226,7 @@ private void persistEntity( List values = modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(); int rowsUpdated = - queryAction.apply( + datasourceOperations.execute( connection, QueryGenerator.generateUpdateQuery( ModelEntity.getAllColumnNames(schemaVersion), @@ -405,11 +408,30 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) { @Override public PolarisBaseEntity lookupEntity( @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, int typeCode) { + PreparedQuery query = buildLookupEntityQuery(catalogId, entityId, typeCode); + return getPolarisBaseEntity(query); + } + + /** + * Lookup entity by ID using an existing connection for transaction consistency. + * + * @param connection connection to use for the query (non-null) + */ + private PolarisBaseEntity lookupEntity( + @Nonnull PolarisCallContext callCtx, + long catalogId, + long entityId, + int typeCode, + @Nonnull Connection connection) { + PreparedQuery query = buildLookupEntityQuery(catalogId, entityId, typeCode); + return getPolarisBaseEntity(query, connection); + } + + private PreparedQuery buildLookupEntityQuery(long catalogId, long entityId, int typeCode) { Map params = Map.of("catalog_id", catalogId, "id", entityId, "type_code", typeCode, "realm_id", realmId); - return getPolarisBaseEntity( - QueryGenerator.generateSelectQuery( - ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params)); + return QueryGenerator.generateSelectQuery( + ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params); } @Override @@ -419,6 +441,28 @@ public PolarisBaseEntity lookupEntityByName( long parentId, int typeCode, @Nonnull String name) { + PreparedQuery query = buildLookupEntityByNameQuery(catalogId, parentId, typeCode, name); + return getPolarisBaseEntity(query); + } + + /** + * Lookup entity by name using an existing connection for transaction consistency. + * + * @param connection connection to use for the query (non-null) + */ + private PolarisBaseEntity lookupEntityByName( + @Nonnull PolarisCallContext callCtx, + long catalogId, + long parentId, + int typeCode, + @Nonnull String name, + @Nonnull Connection connection) { + PreparedQuery query = buildLookupEntityByNameQuery(catalogId, parentId, typeCode, name); + return getPolarisBaseEntity(query, connection); + } + + private PreparedQuery buildLookupEntityByNameQuery( + long catalogId, long parentId, int typeCode, String name) { Map params = Map.of( "catalog_id", @@ -431,31 +475,63 @@ public PolarisBaseEntity lookupEntityByName( name, "realm_id", realmId); - return getPolarisBaseEntity( - QueryGenerator.generateSelectQuery( - ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params)); + return QueryGenerator.generateSelectQuery( + ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params); } @Nullable private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery query) { try { var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion)); - if (results.isEmpty()) { - return null; - } else if (results.size() > 1) { - throw new IllegalStateException( - String.format( - "More than one(%s) entities were found for a given type code : %s", - results.size(), results.getFirst().getTypeCode())); - } else { - return results.getFirst(); - } + return extractSingleEntity(results); + } catch (SQLException e) { + throw new RuntimeException( + String.format("Failed to retrieve polaris entity due to %s", e.getMessage()), e); + } + } + + /** + * Execute entity lookup using an existing connection for transaction consistency. + * + * @param query the prepared query to execute + * @param connection connection to use for the query (non-null) + * @return the entity if found, null otherwise + */ + @Nullable + private PolarisBaseEntity getPolarisBaseEntity( + QueryGenerator.PreparedQuery query, @Nonnull Connection connection) { + try { + var results = + datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion), connection); + return extractSingleEntity(results); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entity due to %s", e.getMessage()), e); } } + /** + * Extract a single entity from query results, validating that exactly zero or one entity is + * returned. + * + * @param results the list of entities returned from the query + * @return the single entity if found, null if no results + * @throws IllegalStateException if more than one entity is found + */ + @Nullable + private PolarisBaseEntity extractSingleEntity(List results) { + if (results.isEmpty()) { + return null; + } else if (results.size() > 1) { + throw new IllegalStateException( + String.format( + "More than one(%s) entities were found for a given type code : %s", + results.size(), results.getFirst().getTypeCode())); + } else { + return results.getFirst(); + } + } + @Nonnull @Override public List lookupEntities( @@ -1050,7 +1126,7 @@ private boolean handleInheritablePolicy( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisPolicyMappingRecord record, @Nonnull PreparedQuery insertQuery, - Connection connection) + @Nonnull Connection connection) throws SQLException { List existingRecords = loadPoliciesOnTargetByType( @@ -1272,9 +1348,4 @@ PolarisStorageIntegration loadPolarisStorageIntegration( BaseMetaStoreManager.extractStorageConfiguration(diagnostics, entity); return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig); } - - @FunctionalInterface - private interface QueryAction { - Integer apply(Connection connection, QueryGenerator.PreparedQuery query) throws SQLException; - } } diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java index 6063ad3d87..a1b01bd41a 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java @@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -32,6 +33,7 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; @@ -291,4 +293,35 @@ void testDefaultConfigurationValues() throws SQLException { assertThrows(SQLException.class, () -> datasourceOperations.withRetries(mockOperation)); verify(mockOperation, times(1)).execute(); } + + /** + * Test that when a Connection is provided to executeSelect, it reuses that connection instead of + * creating a new one. This is critical for transaction consistency - queries within a transaction + * must use the same connection to see uncommitted changes and maintain isolation. + */ + @Test + void testExecuteSelect_reusesConnection() throws Exception { + // Setup query + QueryGenerator.PreparedQuery query = + new QueryGenerator.PreparedQuery("SELECT * FROM entities WHERE id = ?", List.of(1L)); + + // Setup connection to return a prepared statement + when(mockConnection.prepareStatement(query.sql())).thenReturn(mockPreparedStatement); + + // Setup empty result set + ResultSet mockResultSet = mock(ResultSet.class); + when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(false); // Empty result + + // Execute: Call executeSelect WITH a connection + datasourceOperations.executeSelect(query, new ModelEntity(1), mockConnection); + + // CRITICAL VERIFICATION: DataSource.getConnection() should be called ONCE i.e. during INIT + // because we're reusing the provided connection, no more calls to get connection should + // be called. + verify(mockDataSource, times(1)).getConnection(); + + // Verify the provided connection was used to prepare the statement + verify(mockConnection, times(1)).prepareStatement(query.sql()); + } }