Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.hadoop.ozone.recon.persistence;

import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME;
import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_BAD;
import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.UNDER_REPLICATED;
import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS;
import static org.jooq.impl.DSL.count;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.sql.Connection;
import java.util.List;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
Expand All @@ -35,6 +37,7 @@
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.SelectQuery;
import org.jooq.exception.DataAccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,12 +121,34 @@ public Cursor<UnhealthyContainersRecord> getAllUnhealthyRecordsCursor() {

public void insertUnhealthyContainerRecords(List<UnhealthyContainers> recs) {
if (LOG.isDebugEnabled()) {
recs.forEach(rec -> {
LOG.debug("rec.getContainerId() : {}, rec.getContainerState(): {} ", rec.getContainerId(),
rec.getContainerState());
});
recs.forEach(rec -> LOG.debug("rec.getContainerId() : {}, rec.getContainerState(): {}",
rec.getContainerId(), rec.getContainerState()));
}

try (Connection connection = containerSchemaDefinition.getDataSource().getConnection()) {
connection.setAutoCommit(false); // Turn off auto-commit for transactional control
try {
for (UnhealthyContainers rec : recs) {
try {
unhealthyContainersDao.insert(rec);
} catch (DataAccessException dataAccessException) {
// Log the error and update the existing record if ConstraintViolationException occurs
unhealthyContainersDao.update(rec);
LOG.debug("Error while inserting unhealthy container record: {}", rec, dataAccessException);
}
}
connection.commit(); // Commit all inserted/updated records
} catch (Exception innerException) {
connection.rollback(); // Rollback transaction if an error occurs inside processing
LOG.error("Transaction rolled back due to error", innerException);
throw innerException;
} finally {
connection.setAutoCommit(true); // Reset auto-commit before the connection is auto-closed
}
} catch (Exception e) {
LOG.error("Failed to insert records into {} ", UNHEALTHY_CONTAINERS_TABLE_NAME, e);
throw new RuntimeException("Recon failed to insert " + recs.size() + " unhealthy container records.", e);
}
unhealthyContainersDao.insert(recs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -494,6 +494,69 @@ public void testAllContainerStateInsertions() {
}
}

@Test
public void testInsertFailureAndUpdateBehavior() {
UnhealthyContainersDao unHealthyContainersTableHandle =
getDao(UnhealthyContainersDao.class);

ContainerHealthSchemaManager containerHealthSchemaManager =
new ContainerHealthSchemaManager(
getSchemaDefinition(ContainerSchemaDefinition.class),
unHealthyContainersTableHandle);

ContainerSchemaDefinition.UnHealthyContainerStates state =
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING;

long insertedTime = System.currentTimeMillis();
// Create a dummy UnhealthyContainer record with the current state
UnhealthyContainers unhealthyContainer = new UnhealthyContainers();
unhealthyContainer.setContainerId(state.ordinal() + 1L);
unhealthyContainer.setExpectedReplicaCount(3);
unhealthyContainer.setActualReplicaCount(0);
unhealthyContainer.setReplicaDelta(3);
unhealthyContainer.setContainerState(state.name());
unhealthyContainer.setInStateSince(insertedTime);

// Try inserting the record and catch any exception that occurs
Exception exception = null;
try {
containerHealthSchemaManager.insertUnhealthyContainerRecords(
Collections.singletonList(unhealthyContainer));
} catch (Exception e) {
exception = e;
}

// Assert no exception should be thrown for each state
assertNull(exception,
"Exception was thrown during insertion for state " + state.name() +
": " + exception);

long updatedTime = System.currentTimeMillis();
unhealthyContainer.setExpectedReplicaCount(3);
unhealthyContainer.setActualReplicaCount(0);
unhealthyContainer.setReplicaDelta(3);
unhealthyContainer.setContainerState(state.name());
unhealthyContainer.setInStateSince(updatedTime);

try {
containerHealthSchemaManager.insertUnhealthyContainerRecords(
Collections.singletonList(unhealthyContainer));
} catch (Exception e) {
exception = e;
}

// Optionally, verify the record was updated correctly
List<UnhealthyContainers> updatedRecords =
unHealthyContainersTableHandle.fetchByContainerId(
state.ordinal() + 1L);
assertFalse(updatedRecords.isEmpty(),
"Record was not updated for state " + state.name() + ".");
assertEquals(updatedRecords.get(0).getContainerState(), state.name(),
"The inserted container state does not match for state " +
state.name() + ".");
assertEquals(updatedRecords.get(0).getInStateSince(), updatedTime);
}

@Test
public void testMissingAndEmptyMissingContainerDeletion() throws Exception {
// Setup mock DAOs and managers
Expand Down