Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2573e03
HDDS-11465. Introducing Schema Versioning for Recon Derby table to Ha…
ArafatKhan2198 Sep 18, 2024
3e64282
Covered a few edge cases of upgrades and fresh install
ArafatKhan2198 Sep 18, 2024
a4895e0
Made changes for latest discussion
ArafatKhan2198 Sep 23, 2024
3128cd9
Removed the old changes
ArafatKhan2198 Sep 29, 2024
a57f922
New things
ArafatKhan2198 Sep 29, 2024
b80f014
New changes
ArafatKhan2198 Sep 29, 2024
ea5946d
Fixed a bug
ArafatKhan2198 Sep 29, 2024
27f1538
Finalised the new approach
ArafatKhan2198 Sep 29, 2024
e4b4b27
Added log comments for testing
ArafatKhan2198 Sep 29, 2024
262a93d
Added an initial version to Feature enum
ArafatKhan2198 Oct 5, 2024
929d453
Added tests for ReconLayoutVersionManager
ArafatKhan2198 Oct 5, 2024
bf68424
Added more tests for Table definition
ArafatKhan2198 Oct 5, 2024
855ad92
Added missing licence certs
ArafatKhan2198 Oct 5, 2024
6ea93a9
TestCase updation
ArafatKhan2198 Oct 6, 2024
d95a1a7
Replaced the occurance of Schema Layout Version to Software Layout Ve…
ArafatKhan2198 Oct 7, 2024
29fe85b
Refactored Recon layout feature upgrade framework to support annotati…
ArafatKhan2198 Oct 16, 2024
287cafa
Added a new test case
ArafatKhan2198 Oct 18, 2024
81e62f0
Made final changes
ArafatKhan2198 Oct 22, 2024
0afebff
Final changes for review
ArafatKhan2198 Oct 23, 2024
c538a78
Removed the occurance of AUTO_FINALIZE in comments and made changes t…
ArafatKhan2198 Oct 23, 2024
4127ab5
Updated recon context for failed upgrades and made other review changes
ArafatKhan2198 Oct 23, 2024
3609b74
Fixed the error handling review comments
ArafatKhan2198 Oct 24, 2024
defd313
Fixed a failing uT
ArafatKhan2198 Oct 24, 2024
f75c952
Fixed stack strace problem
ArafatKhan2198 Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
*/
package org.hadoop.ozone.recon.codegen;

import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
import org.hadoop.ozone.recon.schema.ReconTaskSchemaDefinition;
import org.hadoop.ozone.recon.schema.ReconSchemaDefinition;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.*;

import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
Expand All @@ -40,5 +36,6 @@ protected void configure() {
schemaBinder.addBinding().to(ContainerSchemaDefinition.class);
schemaBinder.addBinding().to(ReconTaskSchemaDefinition.class);
schemaBinder.addBinding().to(StatsSchemaDefinition.class);
schemaBinder.addBinding().to(SchemaVersionTableDefinition.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
Expand All @@ -39,6 +41,9 @@
@Singleton
public class ContainerSchemaDefinition implements ReconSchemaDefinition {

private static final Logger LOG =
LoggerFactory.getLogger(ContainerSchemaDefinition.class);

public static final String UNHEALTHY_CONTAINERS_TABLE_NAME =
"UNHEALTHY_CONTAINERS";

Expand Down Expand Up @@ -71,25 +76,35 @@ public void initializeSchema() throws SQLException {
Connection conn = dataSource.getConnection();
dslContext = DSL.using(conn);

if (TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
// Drop the existing constraint if it exists
String constraintName = UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1";
dslContext.alterTable(UNHEALTHY_CONTAINERS_TABLE_NAME)
.dropConstraint(constraintName)
.execute();

// Add the updated constraint with all enum states
addUpdatedConstraint();
} else {
// Create the table if it does not exist
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
createUnhealthyContainersTable();
}
}

@Override
public void upgradeSchema(String fromVersion, String toVersion)
throws SQLException {
Connection conn = dataSource.getConnection();
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
return;
}
// Example upgrade script
if (fromVersion.equals("1.0") && toVersion.equals("2.0")) {
runMigrationToVersion2(conn);
LOG.info("Upgraded schema from version 1.0 to 2.0.");
}
}

/**
* Add the updated constraint to the table.
* Run the upgrade to version 2.0.
*/
private void addUpdatedConstraint() {
private void runMigrationToVersion2(Connection conn) throws SQLException {
// Drop the existing constraint if it exists
String constraintName = UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1";
dslContext.alterTable(UNHEALTHY_CONTAINERS_TABLE_NAME)
.dropConstraint(constraintName)
.execute();

// Get all enum values as a list of strings
String[] enumStates = Arrays.stream(UnHealthyContainerStates.values())
.map(Enum::name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ public interface ReconSchemaDefinition {
* Execute DDL that will create Recon schema.
*/
void initializeSchema() throws SQLException;

/**
* Upgrade the schema for the table.
* This method will be called during schema upgrades.
*/
void upgradeSchema(String fromVersion, String toVersion) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public void initializeSchema() throws SQLException {
}
}

@Override
public void upgradeSchema(String fromVersion, String toVersion)
throws SQLException {
// No schema upgrades needed for Recon Task Status table.
}

/**
* Create the Recon Task Status table.
* @param conn connection
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.hadoop.ozone.recon.schema;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

/**
* Class for managing the schema of the SchemaVersion table.
*/
@Singleton
public class SchemaVersionTableDefinition implements ReconSchemaDefinition {

public static final String SCHEMA_VERSION_TABLE_NAME = "RECON_SCHEMA_VERSION";
private final DataSource dataSource;
private DSLContext dslContext;

@Inject
public SchemaVersionTableDefinition(DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public void initializeSchema() throws SQLException {
Connection conn = dataSource.getConnection();
dslContext = DSL.using(conn);

if (!TABLE_EXISTS_CHECK.test(conn, SCHEMA_VERSION_TABLE_NAME)) {
createSchemaVersionTable();
}
}

@Override
public void upgradeSchema(String fromVersion, String toVersion) throws SQLException {
// No schema upgrades needed for the Schema Version table.
}

/**
* Create the Schema Version table.
*/
private void createSchemaVersionTable() throws SQLException {
dslContext.createTableIfNotExists(SCHEMA_VERSION_TABLE_NAME)
.column("version_number", SQLDataType.VARCHAR(10).nullable(false))
.column("applied_on", SQLDataType.TIMESTAMP.defaultValue(DSL.currentTimestamp()))
.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public void initializeSchema() throws SQLException {
}
}

@Override
public void upgradeSchema(String fromVersion, String toVersion)
throws SQLException {
// No schema upgrades needed for the stats table.
}

/**
* Create the Ozone Global Stats table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public void initializeSchema() throws SQLException {
}
}

@Override
public void upgradeSchema(String fromVersion, String toVersion)
throws SQLException {
// No schema upgrades needed for the utilization tables.
}

private void createClusterGrowthTable() {
dslContext.createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
.column("timestamp", SQLDataType.TIMESTAMP.nullable(false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ private ReconConstants() {

public static final String RECON_SCM_SNAPSHOT_DB = "scm.snapshot.db";

// Latest Schema Version for all Recon Derby Tables
// Bump it up whenever there is a schema change in Recon Derby Tables
public static final String LATEST_SCHEMA_VERSION = "2.0";

// By default, limit the number of results returned

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ozone.recon;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -29,29 +30,128 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;

import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;

/**
* Class used to create Recon SQL tables.
* Manages the creation and upgrade of Recon SQL tables with schema versioning.
*
* This class handles the following scenarios:
*
* 1. Fresh Installation:
* - No tables, including `schemaVersionTable`, exist.
* - All tables are created with the latest schema version.
*
* 2. Upgrade from Older Version:
* - All Existing tables (e.g., `UNHEALTHY_CONTAINERS`) are present, but `schemaVersionTable` is missing.
* - Indicates an upgrade from a version without schema tracking which was introduced in version 2.0.
* - The `schemaVersionTable` is created and all tables are upgraded to the latest version.
*
* 3. Upgrade with SchemaVersionTable:
* - `schemaVersionTable` exists but is outdated.
* - Migrations are applied to upgrade all tables to the latest schema.
*
* 4. Schema Already Up to Date:
* - All tables and the schema version match the latest version; no action is needed.
*/
public class ReconSchemaManager {

private static final Logger LOG =
LoggerFactory.getLogger(ReconSchemaManager.class);
private Set<ReconSchemaDefinition> reconSchemaDefinitions = new HashSet<>();
private final ReconSchemaVersionTableManager schemaVersionTableManager;

@Inject
public ReconSchemaManager(Set<ReconSchemaDefinition> reconSchemaDefinitions) {
public ReconSchemaManager(Set<ReconSchemaDefinition> reconSchemaDefinitions,
ReconSchemaVersionTableManager schemaVersionTableManager) {
this.schemaVersionTableManager = schemaVersionTableManager;
this.reconSchemaDefinitions.addAll(reconSchemaDefinitions);
}

@VisibleForTesting
public void createReconSchema() {
reconSchemaDefinitions.forEach(reconSchemaDefinition -> {
public void createAndUpgradeReconSchema() {

boolean isUpgrade = areOtherTablesExisting();

// Initialize all tables
initializeAllSchemas();

// Fetch current version from the SchemaVersionTable
String currentVersion = schemaVersionTableManager.getCurrentSchemaVersion();
String latestVersion = ReconConstants.LATEST_SCHEMA_VERSION;

// Handle cases where currentVersion is null, indicating schemaVersionTable just got created
if (currentVersion == null) {
if (isUpgrade) {
// Case 1: Upgrade from older version where schemaVersionTable was not present
LOG.info("Upgrade from older version detected. Setting current schema version to 1.0.");
currentVersion = "1.0"; // Set current version to the previous version before schemaVersionTable was introduced
} else {
// Case 2: Fresh install
LOG.info("Fresh installation detected. Setting schema version to latest.");
currentVersion = latestVersion;
}
}

// Upgrade schema if necessary
if (!currentVersion.equals(latestVersion)) {
upgradeAllSchemas(currentVersion, latestVersion);
// Update the schema version in the version table after migration
schemaVersionTableManager.updateSchemaVersion(latestVersion);
} else {
LOG.info("Recon Derby Schema is already up to date.");
}
}

/**
* Initialize all schemas.
*/
private void initializeAllSchemas() {
for (ReconSchemaDefinition reconSchemaDefinition : reconSchemaDefinitions) {
try {
reconSchemaDefinition.initializeSchema();
} catch (SQLException e) {
LOG.error("Error creating Recon schema {}.",
reconSchemaDefinition.getClass().getSimpleName(), e);
LOG.error("Error initializing schema: {}", reconSchemaDefinition.getClass().getSimpleName(), e);
}
}
LOG.info("All Derby table schemas initialized.");
}

/**
* Upgrade all schemas to the latest version.
*/
private void upgradeAllSchemas(String currentVersion, String latestVersion) {
for (ReconSchemaDefinition schemaDefinition : reconSchemaDefinitions) {
try {
// Use the upgrade logic from each schema
schemaDefinition.upgradeSchema(currentVersion, latestVersion);
} catch (SQLException e) {
LOG.error("Error upgrading schema: {}", schemaDefinition.getClass().getSimpleName(), e);
}
});
}
LOG.info("All schemas upgraded to the latest version: {}", latestVersion);
}

/**
* Checks whether essential tables (other than schemaVersionTable) already exist in the database.
*
* This method is used to distinguish between an upgrade and a fresh installation scenario.
*
* - If essential tables (like UNHEALTHY_CONTAINERS) exist but the schemaVersionTable does not
* have any records, it indicates an upgrade from an older version where schemaVersionTable
* was not present.
* - If none of the essential tables exist, it indicates a fresh installation, where all tables
* will be created for the first time, including the schemaVersionTable.
*
* @return true if essential tables already exist (indicating an upgrade), false if not
* (indicating a fresh installation)
*/
private boolean areOtherTablesExisting() {
try (Connection conn = schemaVersionTableManager.getDataSource().getConnection()) {
// Check if some essential tables, like UNHEALTHY_CONTAINERS, already exist
return TABLE_EXISTS_CHECK.test(conn, "UNHEALTHY_CONTAINERS");
} catch (SQLException e) {
LOG.error("Error checking table existence for upgrade detection", e);
return false;
}
}
}
Loading