Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.sql.DataSource;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.cli.GenericCli;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -146,6 +147,19 @@ public Void call() throws Exception {
reconSchemaManager.createReconSchema();
LOG.debug("Recon schema creation done.");

LOG.info("Finalizing Layout Features.");
// Handle Recon Schema Versioning
ReconSchemaVersionTableManager versionTableManager =
injector.getInstance(ReconSchemaVersionTableManager.class);
DataSource dataSource = injector.getInstance(DataSource.class);

ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(versionTableManager, reconContext, dataSource);
// Run the upgrade framework to finalize layout features if needed
layoutVersionManager.finalizeLayoutFeatures();

LOG.info("Recon schema versioning completed.");

this.reconSafeModeMgr = injector.getInstance(ReconSafeModeManager.class);
this.reconSafeModeMgr.setInSafeMode(true);
httpServer = injector.getInstance(ReconHttpServer.class);
Expand All @@ -157,17 +171,6 @@ public Void call() throws Exception {
this.reconTaskStatusMetrics =
injector.getInstance(ReconTaskStatusMetrics.class);

// Handle Recon Schema Versioning
ReconSchemaVersionTableManager versionTableManager =
injector.getInstance(ReconSchemaVersionTableManager.class);

ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(versionTableManager, reconContext);
// Run the upgrade framework to finalize layout features if needed
ReconStorageContainerManagerFacade reconStorageContainerManagerFacade =
(ReconStorageContainerManagerFacade) this.getReconStorageContainerManager();
layoutVersionManager.finalizeLayoutFeatures(reconStorageContainerManagerFacade);

LOG.info("Initializing support of Recon Features...");
FeatureProvider.initFeatureSupport(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.sql.SQLException;
import java.util.Arrays;
import javax.sql.DataSource;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
Expand All @@ -48,8 +47,8 @@ public class InitialConstraintUpgradeAction implements ReconUpgradeAction {
private DSLContext dslContext;

@Override
public void execute(ReconStorageContainerManagerFacade scmFacade) throws SQLException {
this.dataSource = scmFacade.getDataSource();
public void execute(DataSource source) throws SQLException {
dataSource = source;
try (Connection conn = dataSource.getConnection()) {
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.hadoop.ozone.recon.ReconContext;
import org.apache.hadoop.ozone.recon.ReconSchemaVersionTableManager;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,16 +41,18 @@ public class ReconLayoutVersionManager {

private final ReconSchemaVersionTableManager schemaVersionTableManager;
private final ReconContext reconContext;
private final DataSource dataSource;

// Metadata Layout Version (MLV) of the Recon Metadata on disk
private int currentMLV;

public ReconLayoutVersionManager(ReconSchemaVersionTableManager schemaVersionTableManager,
ReconContext reconContext)
ReconContext reconContext, DataSource dataSource)
throws SQLException {
this.schemaVersionTableManager = schemaVersionTableManager;
this.currentMLV = determineMLV();
this.reconContext = reconContext;
this.dataSource = dataSource;
ReconLayoutFeature.registerUpgradeActions(); // Register actions via annotation
}

Expand All @@ -77,27 +79,32 @@ private int determineSLV() {
* Finalizes the layout features that need to be upgraded, by executing the upgrade action for each
* feature that is registered for finalization.
*/
public void finalizeLayoutFeatures(ReconStorageContainerManagerFacade scmFacade) {
public void finalizeLayoutFeatures() {
// Get features that need finalization, sorted by version
List<ReconLayoutFeature> featuresToFinalize = getRegisteredFeatures();
LOG.debug("Starting finalization of {} features.", featuresToFinalize.size());

try (Connection connection = scmFacade.getDataSource().getConnection()) {
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false); // Turn off auto-commit for transactional control

for (ReconLayoutFeature feature : featuresToFinalize) {
LOG.debug("Processing feature version: {}", feature.getVersion());
try {
// Fetch only the FINALIZE action for the feature
Optional<ReconUpgradeAction> action = feature.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE);
if (action.isPresent()) {
LOG.debug("Finalize action found for feature version: {}", feature.getVersion());
// Update the schema version in the database
updateSchemaVersion(feature.getVersion(), connection);

// Execute the upgrade action
action.get().execute(scmFacade);
action.get().execute(dataSource);

// Commit the transaction only if both operations succeed
connection.commit();
LOG.info("Feature versioned {} finalized successfully.", feature.getVersion());
} else {
LOG.info("No finalize action found for feature version: {}", feature.getVersion());
}
} catch (Exception e) {
// Rollback any pending changes for the current feature due to failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.ozone.recon.schema.ReconTaskSchemaDefinition;
import org.jooq.DSLContext;
import org.jooq.exception.DataAccessException;
Expand Down Expand Up @@ -61,12 +60,12 @@ private void addColumnToTable(DSLContext dslContext, String columnName) {
*/
private void setColumnAsNonNullable(DSLContext dslContext, String columnName) {
dslContext.alterTable(RECON_TASK_STATUS_TABLE_NAME)
.alterColumn(columnName).setNotNull().execute();
.alterColumn(DSL.name(columnName)).setNotNull()
.execute();
}

@Override
public void execute(ReconStorageContainerManagerFacade scmFacade) throws DataAccessException {
DataSource dataSource = scmFacade.getDataSource();
public void execute(DataSource dataSource) throws DataAccessException {
try (Connection conn = dataSource.getConnection()) {
if (!TABLE_EXISTS_CHECK.test(conn, RECON_TASK_STATUS_TABLE_NAME)) {
return;
Expand All @@ -82,8 +81,8 @@ public void execute(ReconStorageContainerManagerFacade scmFacade) throws DataAcc

//Handle previous table values with new columns default values
int updatedRowCount = dslContext.update(DSL.table(RECON_TASK_STATUS_TABLE_NAME))
.set(DSL.field("last_task_run_status", SQLDataType.INTEGER), 0)
.set(DSL.field("is_current_task_running", SQLDataType.INTEGER), 0)
.set(DSL.field(DSL.name("last_task_run_status"), SQLDataType.INTEGER), 0)
.set(DSL.field(DSL.name("is_current_task_running"), SQLDataType.INTEGER), 0)
.execute();
LOG.info("Updated {} rows with default value for new columns", updatedRowCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.hadoop.ozone.recon.upgrade;

import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import javax.sql.DataSource;

/**
* ReconUpgradeAction is an interface for executing upgrade actions in Recon.
Expand All @@ -40,7 +40,7 @@ enum UpgradeActionType {
/**
* Execute the upgrade action.
*/
void execute(ReconStorageContainerManagerFacade scmFacade) throws Exception;
void execute(DataSource source) throws Exception;

/**
* Provides the type of upgrade phase (e.g., FINALIZE).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.recon.ReconContext;
Expand Down Expand Up @@ -161,8 +162,9 @@ public void testFreshInstallScenario() throws Exception {

// Initialize ReconSchemaVersionTableManager and ReconLayoutVersionManager
ReconSchemaVersionTableManager schemaVersionTableManager = new ReconSchemaVersionTableManager(getDataSource());
DataSource mockDataSource = mock(DataSource.class);
ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class));
new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class), mockDataSource);

// Fetch and verify the current MLV
int mlv = layoutVersionManager.getCurrentMLV();
Expand Down Expand Up @@ -200,8 +202,9 @@ public void testPreUpgradedClusterScenario() throws Exception {

// Initialize ReconSchemaVersionTableManager and ReconLayoutVersionManager
ReconSchemaVersionTableManager schemaVersionTableManager = new ReconSchemaVersionTableManager(getDataSource());
DataSource mockDataSource = mock(DataSource.class);
ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class));
new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class), mockDataSource);

// Fetch and verify the current MLV
int mlv = layoutVersionManager.getCurrentMLV();
Expand Down Expand Up @@ -248,8 +251,9 @@ public void testUpgradedClusterScenario() throws Exception {

// Initialize managers to interact with schema version framework
ReconSchemaVersionTableManager schemaVersionTableManager = new ReconSchemaVersionTableManager(getDataSource());
DataSource mockDataSource = mock(DataSource.class);
ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class));
new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class), mockDataSource);

// Fetch and verify the current MLV stored in the database
int mlv = layoutVersionManager.getCurrentMLV();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void setUp() throws SQLException {
@Test
public void testUpgradeAppliesConstraintModificationForAllStates() throws SQLException {
// Run the upgrade action
upgradeAction.execute(mockScmFacade);
upgradeAction.execute(mockScmFacade.getDataSource());

// Iterate over all valid states and insert records
for (ContainerSchemaDefinition.UnHealthyContainerStates state :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ public void setUp() throws SQLException {
when(feature2.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE))
.thenReturn(Optional.of(action2));

// Define the custom features to be returned
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature1, feature2});

layoutVersionManager = new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class));

// Common mocks for all tests
scmFacadeMock = mock(ReconStorageContainerManagerFacade.class);
mockDataSource = mock(DataSource.class);
mockConnection = mock(Connection.class);

// Define the custom features to be returned
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature1, feature2});

layoutVersionManager = new ReconLayoutVersionManager(schemaVersionTableManager, mock(ReconContext.class),
mockDataSource);

when(scmFacadeMock.getDataSource()).thenReturn(mockDataSource);
when(mockDataSource.getConnection()).thenReturn(mockConnection);

Expand Down Expand Up @@ -125,7 +126,7 @@ public void testInitializationWithMockedValues() {
@Test
public void testFinalizeLayoutFeaturesWithMockedValues() throws SQLException {
// Execute the method under test
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();

// Verify that schema versions are updated for our custom features
verify(schemaVersionTableManager, times(1))
Expand Down Expand Up @@ -160,7 +161,7 @@ public void testNoLayoutFeatures() throws SQLException {
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{});

// Execute the method under test
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();

// Verify that no schema version updates were attempted
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), any(Connection.class));
Expand All @@ -181,7 +182,7 @@ public void testUpgradeActionFailure() throws Exception {
ReconUpgradeAction action1 = mock(ReconUpgradeAction.class);

// Simulate an exception being thrown during the upgrade action execution
doThrow(new RuntimeException("Upgrade failed")).when(action1).execute(scmFacadeMock);
doThrow(new RuntimeException("Upgrade failed")).when(action1).execute(mockDataSource);
when(feature1.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE))
.thenReturn(Optional.of(action1));

Expand All @@ -190,7 +191,7 @@ public void testUpgradeActionFailure() throws Exception {

// Execute the layout feature finalization
try {
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();
} catch (Exception e) {
// Exception is expected, so it's fine to catch and ignore it here
}
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testUpdateSchemaFailure() throws Exception {

// Execute the layout feature finalization
try {
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();
} catch (Exception e) {
// Exception is expected, so it's fine to catch and ignore it here
}
Expand Down Expand Up @@ -272,13 +273,13 @@ public void testUpgradeActionExecutionOrder() throws Exception {
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature2, feature3, feature1});

// Execute the layout feature finalization
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();

// Verify that the actions were executed in the correct order using InOrder
InOrder inOrder = inOrder(action1, action2, action3);
inOrder.verify(action1).execute(scmFacadeMock); // Should be executed first
inOrder.verify(action2).execute(scmFacadeMock); // Should be executed second
inOrder.verify(action3).execute(scmFacadeMock); // Should be executed third
inOrder.verify(action1).execute(mockDataSource); // Should be executed first
inOrder.verify(action2).execute(mockDataSource); // Should be executed second
inOrder.verify(action3).execute(mockDataSource); // Should be executed third
}

/**
Expand All @@ -293,7 +294,7 @@ public void testNoUpgradeActionsNeeded() throws SQLException {
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{});

// Execute the method under test
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();

// Verify that no schema version updates were attempted
verify(schemaVersionTableManager, never()).updateSchemaVersion(anyInt(), eq(mockConnection));
Expand Down Expand Up @@ -326,7 +327,7 @@ public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws
mockedEnum.when(ReconLayoutFeature::values).thenReturn(new ReconLayoutFeature[]{feature1, feature2});

// Finalize the first two features.
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();

// Verify that the schema versions for the first two features were updated
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(1, mockConnection);
Expand All @@ -345,17 +346,17 @@ public void testFinalizingNewFeatureWithoutReFinalizingPreviousFeatures() throws
when(schemaVersionTableManager.getCurrentSchemaVersion()).thenReturn(2);

// Finalize again, but only feature 3 should be finalized.
layoutVersionManager.finalizeLayoutFeatures(scmFacadeMock);
layoutVersionManager.finalizeLayoutFeatures();

// Verify that the schema version for feature 3 was updated
verify(schemaVersionTableManager, times(1)).updateSchemaVersion(3, mockConnection);

// Verify that action1 and action2 were not executed again.
verify(action1, times(1)).execute(scmFacadeMock);
verify(action2, times(1)).execute(scmFacadeMock);
verify(action1, times(1)).execute(mockDataSource);
verify(action2, times(1)).execute(mockDataSource);

// Verify that the upgrade action for feature 3 was executed.
verify(action3, times(1)).execute(scmFacadeMock);
verify(action3, times(1)).execute(mockDataSource);
}

}