Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

import static org.jooq.impl.DSL.name;
Expand Down Expand Up @@ -71,30 +72,27 @@ public int getCurrentSchemaVersion() throws SQLException {
*
* @param newVersion The new version to set.
*/
public void updateSchemaVersion(int newVersion) throws SQLException {
try {
boolean recordExists = dslContext.fetchExists(dslContext.selectOne()
.from(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)));
public void updateSchemaVersion(int newVersion, Connection conn)
throws SQLException {
DSLContext dslContext = DSL.using(conn);
boolean recordExists = dslContext.fetchExists(dslContext.selectOne()
.from(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)));

if (recordExists) {
// Update the existing schema version record
dslContext.update(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.set(DSL.field(name("version_number")), newVersion)
.set(DSL.field(name("applied_on")), DSL.currentTimestamp())
.execute();
LOG.info("Updated schema version to '{}'.", newVersion);
} else {
// Insert a new schema version record
dslContext.insertInto(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.columns(DSL.field(name("version_number")),
DSL.field(name("applied_on")))
.values(newVersion, DSL.currentTimestamp())
.execute();
LOG.info("Inserted new schema version '{}'.", newVersion);
}
} catch (Exception e) {
LOG.error("Failed to update schema version to '{}'.", newVersion, e);
throw new SQLException("Unable to update schema version in the table.", e);
if (recordExists) {
// Update the existing schema version record
dslContext.update(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.set(DSL.field(name("version_number")), newVersion)
.set(DSL.field(name("applied_on")), DSL.currentTimestamp())
.execute();
LOG.info("Updated schema version to '{}'.", newVersion);
} else {
// Insert a new schema version record
dslContext.insertInto(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.columns(DSL.field(name("version_number")),
DSL.field(name("applied_on")))
.values(newVersion, DSL.currentTimestamp())
.execute();
LOG.info("Inserted new schema version '{}'.", newVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -88,10 +89,27 @@ public void finalizeLayoutFeatures(ReconStorageContainerManagerFacade scmFacade)
// Fetch only the FINALIZE action for the feature
Optional<ReconUpgradeAction> action = feature.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE);
if (action.isPresent()) {
// Execute the upgrade action & update the schema version in the DB
action.get().execute(scmFacade);
updateSchemaVersion(feature.getVersion());
LOG.info("Feature versioned {} finalized successfully.", feature.getVersion());
try (Connection connection = scmFacade.getDataSource()
.getConnection()) {
connection.setAutoCommit(
false); // Turn off auto-commit for transactional control

// Update the schema version in the database
updateSchemaVersion(feature.getVersion(), connection);

// Execute the upgrade action
action.get().execute(scmFacade);

// Commit the transaction only if both operations succeed
connection.commit();
LOG.info("Feature versioned {} finalized successfully.",
feature.getVersion());
} catch (Exception e) {
// Rollback any pending change`s due to failure
LOG.error("Failed to finalize feature {}. Rolling back changes.",
feature.getVersion(), e);
throw e; // Re-throw to ensure consistent error handling
}
}
} catch (Exception e) {
// Log the error to both logs and ReconContext
Expand All @@ -104,6 +122,7 @@ public void finalizeLayoutFeatures(ReconStorageContainerManagerFacade scmFacade)
}
}


/**
* Returns a list of ReconLayoutFeature objects that are registered for finalization.
*/
Expand All @@ -123,10 +142,13 @@ protected List<ReconLayoutFeature> getRegisteredFeatures() {

/**
* Updates the Metadata Layout Version (MLV) in the database after finalizing a feature.
* This method uses the provided connection to ensure transactional consistency.
*
* @param newVersion The new Metadata Layout Version (MLV) to set.
* @param connection The database connection to use for the update operation.
*/
private void updateSchemaVersion(int newVersion) throws SQLException {
schemaVersionTableManager.updateSchemaVersion(newVersion);
private void updateSchemaVersion(int newVersion, Connection connection) throws SQLException {
schemaVersionTableManager.updateSchemaVersion(newVersion, connection);
this.currentMLV = newVersion;
LOG.info("MLV updated to: " + newVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public void testFinalizeLayoutFeaturesWithMockedValues() throws SQLException {
ReconStorageContainerManagerFacade.class));

// Verify that schema versions are updated for our custom features
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1, null);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2, null);
}

/**
Expand Down Expand Up @@ -141,7 +141,7 @@ public void testNoLayoutFeatures() throws SQLException {
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{});
layoutVersionManager.finalizeLayoutFeatures(mock(
ReconStorageContainerManagerFacade.class));
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt());
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), null);
}

/**
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testUpgradeActionFailure() throws Exception {
}

// Verify that schema version update was never called due to the exception
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt());
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), null);
}

/**
Expand Down Expand Up @@ -235,7 +235,7 @@ public void testNoUpgradeActionsNeeded() throws SQLException {
layoutVersionManager.finalizeLayoutFeatures(mock(
ReconStorageContainerManagerFacade.class));

verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt());
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), null);
}

/**
Expand Down Expand Up @@ -269,8 +269,8 @@ public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);

// Verify that the schema versions for the first two features were updated.
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1, null);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2, null);

// Step 2: Introduce a new feature (Feature 3).
ReconLayoutFeature feature3 = mock(ReconLayoutFeature.class);
Expand All @@ -288,7 +288,7 @@ public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);

// Verify that the schema version for feature 3 was updated.
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(3);
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(3, null);

// Verify that action1 and action2 were not executed again.
verify(action1, times(1)).execute(scmFacadeMock);
Expand Down
Loading