Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,56 @@ public <T> List<T> executeSelect(
return results;
}

/**
* Executes SELECT Query using an existing connection (for use within transactions). Returns the
* results after applying a transformer.
*
* @param query : Query to execute
* @param converterInstance : An instance of the type being selected, used to convert to a
* business entity like PolarisBaseEntity
* @param connection : The connection to use for the query
* @return The list of results yielded by the query
* @param <T> : Business entity class
* @throws SQLException : Exception during the query execution.
*/
public <T> List<T> executeSelect(
@Nonnull QueryGenerator.PreparedQuery query,
@Nonnull Converter<T> converterInstance,
@Nonnull Connection connection)
throws SQLException {
logQuery(query);
return executeSelectWithConnection(query, converterInstance, connection);
}

/**
* Helper method to execute a SELECT query with a given connection and collect results.
*
* @param query : Query to execute
* @param converterInstance : Converter to transform ResultSet rows
* @param connection : Connection to use (must not be closed by this method)
* @return List of converted results
* @param <T> : Business entity class
* @throws SQLException : Exception during query execution
*/
private <T> List<T> executeSelectWithConnection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this method is called from only one place... why not inline?

@Nonnull QueryGenerator.PreparedQuery query,
@Nonnull Converter<T> converterInstance,
@Nonnull Connection connection)
throws SQLException {
try (PreparedStatement statement = connection.prepareStatement(query.sql())) {
List<Object> params = query.parameters();
for (int i = 0; i < params.size(); i++) {
statement.setObject(i + 1, params.get(i));
}
try (ResultSet resultSet = statement.executeQuery()) {
ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance);
ArrayList<T> results = new ArrayList<>();
iterator.toStream().forEach(results::add);
return results;
}
}
}

/**
* Executes SELECT Query and takes a consumer over the results. For callers that want more
* sophisticated control over how query results are handled.
Expand Down Expand Up @@ -186,23 +236,37 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE
return withRetries(
() -> {
logQuery(preparedQuery);
try (Connection connection = borrowConnection();
PreparedStatement statement = connection.prepareStatement(preparedQuery.sql())) {
List<Object> params = preparedQuery.parameters();
for (int i = 0; i < params.size(); i++) {
statement.setObject(i + 1, params.get(i));
}
try (Connection connection = borrowConnection()) {
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
try {
return statement.executeUpdate();
return executeUpdateWithConnection(preparedQuery, connection);
} finally {
connection.setAutoCommit(autoCommit);
}
}
});
}

/**
* Helper method to execute an UPDATE/INSERT query with a given connection.
*
* @param preparedQuery : Query to execute
* @param connection : Connection to use (must not be closed by this method)
* @return Number of rows modified
* @throws SQLException : Exception during query execution
*/
private int executeUpdateWithConnection(
QueryGenerator.PreparedQuery preparedQuery, Connection connection) throws SQLException {
try (PreparedStatement statement = connection.prepareStatement(preparedQuery.sql())) {
List<Object> params = preparedQuery.parameters();
for (int i = 0; i < params.size(); i++) {
statement.setObject(i + 1, params.get(i));
}
return statement.executeUpdate();
}
}

/**
* Executes the INSERT/UPDATE Queries in batches. Requires that all SQL queries have the same
* parameterized form.
Expand Down Expand Up @@ -291,16 +355,18 @@ public void runWithinTransaction(TransactionCallback callback) throws SQLExcepti
});
}

/**
* Executes UPDATE/INSERT query using an existing connection (for use within transactions).
*
* @param connection : Connection to use
* @param preparedQuery : Query to execute
* @return Number of rows modified
* @throws SQLException : Exception during query execution
*/
public Integer execute(Connection connection, QueryGenerator.PreparedQuery preparedQuery)
throws SQLException {
logQuery(preparedQuery);
try (PreparedStatement statement = connection.prepareStatement(preparedQuery.sql())) {
List<Object> params = preparedQuery.parameters();
for (int i = 0; i < params.size(); i++) {
statement.setObject(i + 1, params.get(i));
}
return statement.executeUpdate();
}
return executeUpdateWithConnection(preparedQuery, connection);
}

private boolean isRetryable(SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,10 @@ public void writeEntity(
boolean nameOrParentChanged,
PolarisBaseEntity originalEntity) {
try {
persistEntity(
callCtx,
entity,
originalEntity,
null,
(connection, preparedQuery) -> {
return datasourceOperations.executeUpdate(preparedQuery);
datasourceOperations.runWithinTransaction(
connection -> {
persistEntity(callCtx, entity, originalEntity, connection);
return true;
});
} catch (SQLException e) {
throw new RuntimeException("Error persisting entity", e);
Expand All @@ -148,15 +145,18 @@ public void writeEntities(
// return it.
PolarisBaseEntity entityFound =
lookupEntity(
callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode());
callCtx,
entity.getCatalogId(),
entity.getId(),
entity.getTypeCode(),
connection);
if (entityFound != null && originalEntity == null) {
// probably the client retried, simply return it
// TODO: Check correctness of returning entityFound vs entity here. It may have
// already been updated after the creation.
continue;
}
persistEntity(
callCtx, entity, originalEntity, connection, datasourceOperations::execute);
persistEntity(callCtx, entity, originalEntity, connection);
}
return true;
});
Expand All @@ -172,15 +172,29 @@ private void persistEntity(
@Nonnull PolarisCallContext callCtx,
@Nonnull PolarisBaseEntity entity,
PolarisBaseEntity originalEntity,
Connection connection,
QueryAction queryAction)
@Nonnull Connection connection)
throws SQLException {
ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion);
if (originalEntity == null) {
// Check if entity already exists before attempting INSERT to avoid constraint violations
// that would abort the transaction in PostgreSQL
PolarisBaseEntity existingEntity =
lookupEntityByName(
callCtx,
entity.getCatalogId(),
entity.getParentId(),
entity.getTypeCode(),
entity.getName(),
connection);

if (existingEntity != null) {
throw new EntityAlreadyExistsException(existingEntity);
}

try {
List<Object> values =
modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
queryAction.apply(
datasourceOperations.execute(
connection,
QueryGenerator.generateInsertQuery(
ModelEntity.getAllColumnNames(schemaVersion),
Expand All @@ -189,21 +203,10 @@ private void persistEntity(
realmId));
} catch (SQLException e) {
if (datasourceOperations.isConstraintViolation(e)) {
PolarisBaseEntity existingEntity =
lookupEntityByName(
callCtx,
entity.getCatalogId(),
entity.getParentId(),
entity.getTypeCode(),
entity.getName());
// This happens in two scenarios:
// 1. PRIMARY KEY violated
// 2. UNIQUE CONSTRAINT on (realm_id, catalog_id, parent_id, type_code, name) violated
// With SERIALIZABLE isolation, the conflicting entity may _not_ be visible and
// existingEntity can be null, which would cause an NPE in
// EntityAlreadyExistsException.message().
throw new EntityAlreadyExistsException(
existingEntity != null ? existingEntity : entity, e);
Comment on lines -192 to -206
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PG aborts the transaction when it encounters the error :

If a statement within a transaction raises an exception, the entire transaction is 
  aborted and all statements are rolled back. The transaction remains in an aborted state, 
  and no further SQL commands will be executed until a ROLLBACK command is issued.

hence a lookup after write failure is not possible, the tests failed because of this.
I changed to read before write attempt lets say we are working with SERIALIZABLE isolation would work now. I see and think more it was intentional :'( as the comment says, to do it this way worst case was we not able to get existing entity which would have been fine.

Now lookup before before write is gonna cause some perf issue (but only for certain cases) and wrapping writeEntity would elevate the scenario more, will need to run some benchmark before we check it in,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for the research! That sounds like a good plan 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could explicitly start a new Tx for lookupEntityByName only on failure... WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precisely presently thats what we are doing in the current code, the connection is a separate connection than the one used in the transaction in cases of writeEntities or a completely different connection in writeEntity i am also debating the same is it worth it, i will run benchmark for this 50:50 R/W to see how much we deviate from prev numbers as recorded here : #1517 (comment)

// Constraint violation despite our check above - this can happen due to race conditions
// where another transaction inserted the same entity between our SELECT and INSERT.
// We cannot lookup again here because PostgreSQL aborts the transaction after any error.
throw new EntityAlreadyExistsException(entity, e);
}
throw new RuntimeException(
String.format("Failed to write entity due to %s", e.getMessage()), e);
Expand All @@ -223,7 +226,7 @@ private void persistEntity(
List<Object> values =
modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
int rowsUpdated =
queryAction.apply(
datasourceOperations.execute(
connection,
QueryGenerator.generateUpdateQuery(
ModelEntity.getAllColumnNames(schemaVersion),
Expand Down Expand Up @@ -405,11 +408,30 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) {
@Override
public PolarisBaseEntity lookupEntity(
@Nonnull PolarisCallContext callCtx, long catalogId, long entityId, int typeCode) {
PreparedQuery query = buildLookupEntityQuery(catalogId, entityId, typeCode);
return getPolarisBaseEntity(query);
}

/**
* Lookup entity by ID using an existing connection for transaction consistency.
*
* @param connection connection to use for the query (non-null)
*/
private PolarisBaseEntity lookupEntity(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
int typeCode,
@Nonnull Connection connection) {
PreparedQuery query = buildLookupEntityQuery(catalogId, entityId, typeCode);
return getPolarisBaseEntity(query, connection);
}

private PreparedQuery buildLookupEntityQuery(long catalogId, long entityId, int typeCode) {
Map<String, Object> params =
Map.of("catalog_id", catalogId, "id", entityId, "type_code", typeCode, "realm_id", realmId);
return getPolarisBaseEntity(
QueryGenerator.generateSelectQuery(
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
return QueryGenerator.generateSelectQuery(
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params);
}

@Override
Expand All @@ -419,6 +441,28 @@ public PolarisBaseEntity lookupEntityByName(
long parentId,
int typeCode,
@Nonnull String name) {
PreparedQuery query = buildLookupEntityByNameQuery(catalogId, parentId, typeCode, name);
return getPolarisBaseEntity(query);
}

/**
* Lookup entity by name using an existing connection for transaction consistency.
*
* @param connection connection to use for the query (non-null)
*/
private PolarisBaseEntity lookupEntityByName(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
int typeCode,
@Nonnull String name,
@Nonnull Connection connection) {
PreparedQuery query = buildLookupEntityByNameQuery(catalogId, parentId, typeCode, name);
return getPolarisBaseEntity(query, connection);
}

private PreparedQuery buildLookupEntityByNameQuery(
long catalogId, long parentId, int typeCode, String name) {
Map<String, Object> params =
Map.of(
"catalog_id",
Expand All @@ -431,31 +475,63 @@ public PolarisBaseEntity lookupEntityByName(
name,
"realm_id",
realmId);
return getPolarisBaseEntity(
QueryGenerator.generateSelectQuery(
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
return QueryGenerator.generateSelectQuery(
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params);
}

@Nullable
private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery query) {
try {
var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
if (results.isEmpty()) {
return null;
} else if (results.size() > 1) {
throw new IllegalStateException(
String.format(
"More than one(%s) entities were found for a given type code : %s",
results.size(), results.getFirst().getTypeCode()));
} else {
return results.getFirst();
}
return extractSingleEntity(results);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entity due to %s", e.getMessage()), e);
}
}

/**
* Execute entity lookup using an existing connection for transaction consistency.
*
* @param query the prepared query to execute
* @param connection connection to use for the query (non-null)
* @return the entity if found, null otherwise
*/
@Nullable
private PolarisBaseEntity getPolarisBaseEntity(
QueryGenerator.PreparedQuery query, @Nonnull Connection connection) {
try {
var results =
datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion), connection);
return extractSingleEntity(results);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entity due to %s", e.getMessage()), e);
}
}

/**
* Extract a single entity from query results, validating that exactly zero or one entity is
* returned.
*
* @param results the list of entities returned from the query
* @return the single entity if found, null if no results
* @throws IllegalStateException if more than one entity is found
*/
@Nullable
private PolarisBaseEntity extractSingleEntity(List<PolarisBaseEntity> results) {
if (results.isEmpty()) {
return null;
} else if (results.size() > 1) {
throw new IllegalStateException(
String.format(
"More than one(%s) entities were found for a given type code : %s",
results.size(), results.getFirst().getTypeCode()));
} else {
return results.getFirst();
}
}

@Nonnull
@Override
public List<PolarisBaseEntity> lookupEntities(
Expand Down Expand Up @@ -1050,7 +1126,7 @@ private boolean handleInheritablePolicy(
@Nonnull PolarisCallContext callCtx,
@Nonnull PolarisPolicyMappingRecord record,
@Nonnull PreparedQuery insertQuery,
Connection connection)
@Nonnull Connection connection)
throws SQLException {
List<PolarisPolicyMappingRecord> existingRecords =
loadPoliciesOnTargetByType(
Expand Down Expand Up @@ -1272,9 +1348,4 @@ PolarisStorageIntegration<T> loadPolarisStorageIntegration(
BaseMetaStoreManager.extractStorageConfiguration(diagnostics, entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
}

@FunctionalInterface
private interface QueryAction {
Integer apply(Connection connection, QueryGenerator.PreparedQuery query) throws SQLException;
}
}
Loading