diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconSchemaVersionTableManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconSchemaVersionTableManager.java index d7c3c65f2c15..e01d52b89cd0 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconSchemaVersionTableManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconSchemaVersionTableManager.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import javax.sql.DataSource; +import java.sql.Connection; import java.sql.SQLException; import static org.jooq.impl.DSL.name; @@ -38,7 +39,7 @@ public class ReconSchemaVersionTableManager { private static final Logger LOG = LoggerFactory.getLogger(ReconSchemaVersionTableManager.class); public static final String RECON_SCHEMA_VERSION_TABLE_NAME = "RECON_SCHEMA_VERSION"; - private final DSLContext dslContext; + private DSLContext dslContext; private final DataSource dataSource; @Inject @@ -71,30 +72,26 @@ public int getCurrentSchemaVersion() throws SQLException { * * @param newVersion The new version to set. */ - public void updateSchemaVersion(int newVersion) throws SQLException { - try { - boolean recordExists = dslContext.fetchExists(dslContext.selectOne() - .from(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))); + public void updateSchemaVersion(int newVersion, Connection conn) { + dslContext = DSL.using(conn); + boolean recordExists = dslContext.fetchExists(dslContext.selectOne() + .from(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))); - if (recordExists) { - // Update the existing schema version record - dslContext.update(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)) - .set(DSL.field(name("version_number")), newVersion) - .set(DSL.field(name("applied_on")), DSL.currentTimestamp()) - .execute(); - LOG.info("Updated schema version to '{}'.", newVersion); - } else { - // Insert a new schema version record - dslContext.insertInto(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)) - .columns(DSL.field(name("version_number")), - DSL.field(name("applied_on"))) - .values(newVersion, DSL.currentTimestamp()) - .execute(); - LOG.info("Inserted new schema version '{}'.", newVersion); - } - } catch (Exception e) { - LOG.error("Failed to update schema version to '{}'.", newVersion, e); - throw new SQLException("Unable to update schema version in the table.", e); + if (recordExists) { + // Update the existing schema version record + dslContext.update(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)) + .set(DSL.field(name("version_number")), newVersion) + .set(DSL.field(name("applied_on")), DSL.currentTimestamp()) + .execute(); + LOG.info("Updated schema version to '{}'.", newVersion); + } else { + // Insert a new schema version record + dslContext.insertInto(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)) + .columns(DSL.field(name("version_number")), + DSL.field(name("applied_on"))) + .values(newVersion, DSL.currentTimestamp()) + .execute(); + LOG.info("Inserted new schema version '{}'.", newVersion); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutVersionManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutVersionManager.java index e9f7fc9650d6..a595b6a0c107 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutVersionManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/ReconLayoutVersionManager.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; import java.sql.SQLException; import java.util.Arrays; import java.util.List; @@ -83,27 +84,42 @@ public void finalizeLayoutFeatures(ReconStorageContainerManagerFacade scmFacade) // Get features that need finalization, sorted by version List featuresToFinalize = getRegisteredFeatures(); - for (ReconLayoutFeature feature : featuresToFinalize) { - try { - // Fetch only the FINALIZE action for the feature - Optional action = feature.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE); - if (action.isPresent()) { - // Execute the upgrade action & update the schema version in the DB - action.get().execute(scmFacade); - updateSchemaVersion(feature.getVersion()); - LOG.info("Feature versioned {} finalized successfully.", feature.getVersion()); + try (Connection connection = scmFacade.getDataSource().getConnection()) { + connection.setAutoCommit(false); // Turn off auto-commit for transactional control + + for (ReconLayoutFeature feature : featuresToFinalize) { + try { + // Fetch only the FINALIZE action for the feature + Optional action = feature.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE); + if (action.isPresent()) { + // Update the schema version in the database + updateSchemaVersion(feature.getVersion(), connection); + + // Execute the upgrade action + action.get().execute(scmFacade); + + // Commit the transaction only if both operations succeed + connection.commit(); + LOG.info("Feature versioned {} finalized successfully.", feature.getVersion()); + } + } catch (Exception e) { + // Rollback any pending changes for the current feature due to failure + connection.rollback(); + currentMLV = determineMLV(); // Rollback the MLV to the original value + LOG.error("Failed to finalize feature {}. Rolling back changes.", feature.getVersion(), e); + throw e; } - } catch (Exception e) { - // Log the error to both logs and ReconContext - LOG.error("Failed to finalize feature {}: {}", feature.getVersion(), e.getMessage()); - reconContext.updateErrors(ReconContext.ErrorCode.UPGRADE_FAILURE); - reconContext.updateHealthStatus(new AtomicBoolean(false)); - // Stop further upgrades as an error occurred - throw new RuntimeException("Recon failed to finalize layout feature. Startup halted."); } + } catch (Exception e) { + // Log the error to both logs and ReconContext + LOG.error("Failed to finalize layout features: {}", e.getMessage()); + reconContext.updateErrors(ReconContext.ErrorCode.UPGRADE_FAILURE); + reconContext.updateHealthStatus(new AtomicBoolean(false)); + throw new RuntimeException("Recon failed to finalize layout features. Startup halted.", e); } } + /** * Returns a list of ReconLayoutFeature objects that are registered for finalization. */ @@ -123,10 +139,13 @@ protected List getRegisteredFeatures() { /** * Updates the Metadata Layout Version (MLV) in the database after finalizing a feature. + * This method uses the provided connection to ensure transactional consistency. + * * @param newVersion The new Metadata Layout Version (MLV) to set. + * @param connection The database connection to use for the update operation. */ - private void updateSchemaVersion(int newVersion) throws SQLException { - schemaVersionTableManager.updateSchemaVersion(newVersion); + private void updateSchemaVersion(int newVersion, Connection connection) { + schemaVersionTableManager.updateSchemaVersion(newVersion, connection); this.currentMLV = newVersion; LOG.info("MLV updated to: " + newVersion); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/upgrade/TestReconLayoutVersionManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/upgrade/TestReconLayoutVersionManager.java index e1a949b6d15f..a22c737691d1 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/upgrade/TestReconLayoutVersionManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/upgrade/TestReconLayoutVersionManager.java @@ -28,21 +28,26 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; +import javax.sql.DataSource; +import java.sql.Connection; import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.anyInt; /** @@ -54,6 +59,9 @@ public class TestReconLayoutVersionManager { private ReconLayoutVersionManager layoutVersionManager; private MockedStatic mockedEnum; private MockedStatic mockedEnumUpgradeActionType; + private ReconStorageContainerManagerFacade scmFacadeMock; + private DataSource mockDataSource; + private Connection mockConnection; @BeforeEach public void setUp() throws SQLException { @@ -80,6 +88,18 @@ public void setUp() throws SQLException { mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature1, feature2}); layoutVersionManager = new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class)); + + // Common mocks for all tests + scmFacadeMock = mock(ReconStorageContainerManagerFacade.class); + mockDataSource = mock(DataSource.class); + mockConnection = mock(Connection.class); + + when(scmFacadeMock.getDataSource()).thenReturn(mockDataSource); + when(mockDataSource.getConnection()).thenReturn(mockConnection); + + doNothing().when(mockConnection).setAutoCommit(false); + doNothing().when(mockConnection).commit(); + doNothing().when(mockConnection).rollback(); } @AfterEach @@ -103,17 +123,19 @@ public void testInitializationWithMockedValues() { } /** - * Tests the finalization of layout features and ensure that the updateSchemaVersion for + * Tests the finalization of layout features and ensures that the updateSchemaVersion for * the schemaVersionTable is triggered for each feature version. */ @Test public void testFinalizeLayoutFeaturesWithMockedValues() throws SQLException { - layoutVersionManager.finalizeLayoutFeatures(mock( - ReconStorageContainerManagerFacade.class)); + // Execute the method under test + layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); // Verify that schema versions are updated for our custom features - verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1); - verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2); + verify(schemaVersionTableManager, times(1)) + .updateSchemaVersion(1, mockConnection); + verify(schemaVersionTableManager, times(1)) + .updateSchemaVersion(2, mockConnection); } /** @@ -138,10 +160,14 @@ public void testGetRegisteredFeaturesWithMockedValues() { */ @Test public void testNoLayoutFeatures() throws SQLException { + // Ensure no layout features are present mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{}); - layoutVersionManager.finalizeLayoutFeatures(mock( - ReconStorageContainerManagerFacade.class)); - verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt()); + + // Execute the method under test + layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); + + // Verify that no schema version updates were attempted + verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), any(Connection.class)); } /** @@ -158,9 +184,6 @@ public void testUpgradeActionFailure() throws Exception { when(feature1.getVersion()).thenReturn(1); ReconUpgradeAction action1 = mock(ReconUpgradeAction.class); - // Create a consistent mock instance for the SCM facade - ReconStorageContainerManagerFacade scmFacadeMock = mock(ReconStorageContainerManagerFacade.class); - // Simulate an exception being thrown during the upgrade action execution doThrow(new RuntimeException("Upgrade failed")).when(action1).execute(scmFacadeMock); when(feature1.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE)) @@ -176,8 +199,49 @@ public void testUpgradeActionFailure() throws Exception { // Exception is expected, so it's fine to catch and ignore it here } - // Verify that schema version update was never called due to the exception - verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt()); + // Verify that metadata layout version MLV was not updated as the transaction was rolled back + assertEquals(0, layoutVersionManager.getCurrentMLV()); + + // Verify that a rollback was triggered + verify(mockConnection, times(1)).rollback(); + } + + /** + * Tests the scenario where the schema version update fails. Ensures that if the schema + * version update fails, the transaction is rolled back and the metadata layout version + * is not updated. + */ + @Test + public void testUpdateSchemaFailure() throws Exception { + // Reset existing mocks and set up new features for this specific test + mockedEnum.reset(); + + // Mock ReconLayoutFeature instances + ReconLayoutFeature feature1 = mock(ReconLayoutFeature.class); + when(feature1.getVersion()).thenReturn(1); + ReconUpgradeAction action1 = mock(ReconUpgradeAction.class); + + // Simulate an exception being thrown during the schema version update + doThrow(new RuntimeException("Schema update failed")).when(schemaVersionTableManager). + updateSchemaVersion(1, mockConnection); + when(feature1.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE)) + .thenReturn(Optional.of(action1)); + + // Mock the static values method to return the custom feature + mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature1}); + + // Execute the layout feature finalization + try { + layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); + } catch (Exception e) { + // Exception is expected, so it's fine to catch and ignore it here + } + + // Verify that metadata layout version MLV was not updated as the transaction was rolled back + assertEquals(0, layoutVersionManager.getCurrentMLV()); + + // Verify that the upgrade action was not committed and a rollback was triggered + verify(mockConnection, times(1)).rollback(); } /** @@ -211,9 +275,6 @@ public void testUpgradeActionExecutionOrder() throws Exception { // Mock the static values method to return custom features in a jumbled order mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature2, feature3, feature1}); - // Create a consistent mock instance for SCM facade - ReconStorageContainerManagerFacade scmFacadeMock = mock(ReconStorageContainerManagerFacade.class); - // Execute the layout feature finalization layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); @@ -230,12 +291,16 @@ public void testUpgradeActionExecutionOrder() throws Exception { */ @Test public void testNoUpgradeActionsNeeded() throws SQLException { - when(schemaVersionTableManager.getCurrentSchemaVersion()).thenReturn(2); - layoutVersionManager = new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class)); - layoutVersionManager.finalizeLayoutFeatures(mock( - ReconStorageContainerManagerFacade.class)); + // Mock the current schema version to the maximum layout version + when(schemaVersionTableManager.getCurrentSchemaVersion()).thenReturn(0); + + mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{}); + + // Execute the method under test + layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); - verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt()); + // Verify that no schema version updates were attempted + verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), eq(mockConnection)); } /** @@ -246,10 +311,10 @@ public void testNoUpgradeActionsNeeded() throws SQLException { */ @Test public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws Exception { - // Step 1: Finalize the first two features. + // Step 1: Mock the schema version manager when(schemaVersionTableManager.getCurrentSchemaVersion()).thenReturn(0); - // Mock the first two features. + // Mock the first two features ReconLayoutFeature feature1 = mock(ReconLayoutFeature.class); when(feature1.getVersion()).thenReturn(1); ReconUpgradeAction action1 = mock(ReconUpgradeAction.class); @@ -264,15 +329,14 @@ public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature1, feature2}); - ReconStorageContainerManagerFacade scmFacadeMock = mock(ReconStorageContainerManagerFacade.class); // Finalize the first two features. layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); - // Verify that the schema versions for the first two features were updated. - verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1); - verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2); + // Verify that the schema versions for the first two features were updated + verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1, mockConnection); + verify(schemaVersionTableManager, times(1)).updateSchemaVersion(2, mockConnection); - // Step 2: Introduce a new feature (Feature 3). + // Step 2: Introduce a new feature (Feature 3) ReconLayoutFeature feature3 = mock(ReconLayoutFeature.class); when(feature3.getVersion()).thenReturn(3); ReconUpgradeAction action3 = mock(ReconUpgradeAction.class); @@ -287,8 +351,8 @@ public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws // Finalize again, but only feature 3 should be finalized. layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock); - // Verify that the schema version for feature 3 was updated. - verify(schemaVersionTableManager, times(1)).updateSchemaVersion(3); + // Verify that the schema version for feature 3 was updated + verify(schemaVersionTableManager, times(1)).updateSchemaVersion(3, mockConnection); // Verify that action1 and action2 were not executed again. verify(action1, times(1)).execute(scmFacadeMock);