Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.hadoop.ozone.upgrade;

import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FIRST_UPGRADE_START;
import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.UNFINALIZED_STATE_VALIDATION;
import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.VALIDATE_IN_PREFINALIZE;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.FIRST_UPGRADE_START_ACTION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.INVALID_REQUEST;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_STATE_VALIDATION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_ACTION_VALIDATION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
Expand All @@ -50,37 +52,89 @@
/**
* UpgradeFinalizer implementation for the Storage Container Manager service.
*/
@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class BasicUpgradeFinalizer
<T, V extends AbstractLayoutVersionManager> implements UpgradeFinalizer<T> {

protected V versionManager;
protected String clientID;
protected T component;
protected DefaultUpgradeFinalizationExecutor finalizationExecutor;
private V versionManager;
private String clientID;
private T component;
private DefaultUpgradeFinalizationExecutor<T> finalizationExecutor;

private Queue<String> msgs = new ConcurrentLinkedQueue<>();
protected boolean isDone = false;
private boolean isDone = false;

public BasicUpgradeFinalizer(V versionManager) {
this.versionManager = versionManager;
this.finalizationExecutor =
new DefaultUpgradeFinalizationExecutor();
this.finalizationExecutor = new DefaultUpgradeFinalizationExecutor<>();
}

/**
* Sets the Finalization Executor driver.
* @param executor FinalizationExecutor.
*/
public StatusAndMessages finalize(String upgradeClientID, T service)
throws IOException {
StatusAndMessages response = initFinalize(upgradeClientID, service);
if (response.status() != FINALIZATION_REQUIRED) {
return response;
}
finalizationExecutor.execute(service, this);
return STARTING_MSG;
}

public void setFinalizationExecutor(DefaultUpgradeFinalizationExecutor
executor) {
finalizationExecutor = executor;
public synchronized StatusAndMessages reportStatus(
String upgradeClientID, boolean takeover) throws UpgradeException {
if (takeover) {
clientID = upgradeClientID;
}
assertClientId(upgradeClientID);
List<String> returningMsgs = new ArrayList<>(msgs.size() + 10);
Status status = versionManager.getUpgradeState();
while (msgs.size() > 0) {
returningMsgs.add(msgs.poll());
}
return new StatusAndMessages(status, returningMsgs);
}

protected void preFinalizeUpgrade(T service) throws IOException {
// No Op by default.
}

protected void postFinalizeUpgrade(T service) throws IOException {
// No Op by default.
}

public abstract void finalizeUpgrade(T service) throws UpgradeException;

@Override
public DefaultUpgradeFinalizationExecutor getFinalizationExecutor() {
return finalizationExecutor;
public void finalizeAndWaitForCompletion(
Comment thread
avijayanhwx marked this conversation as resolved.
String upgradeClientID, T service, long maxTimeToWaitInSeconds)
throws IOException {

StatusAndMessages response = finalize(upgradeClientID, service);
LOG.info("Finalization Messages : {} ", response.msgs());
if (isFinalized(response.status())) {
return;
}

boolean success = false;
long endTime = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(maxTimeToWaitInSeconds);
while (System.currentTimeMillis() < endTime) {
try {
response = reportStatus(upgradeClientID, false);
LOG.info("Finalization Messages : {} ", response.msgs());
if (isFinalized(response.status())) {
success = true;
break;
}
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Finalization Wait thread interrupted!");
}
}
if (!success) {
throw new IOException(
String.format("Unable to finalize after waiting for %d seconds",
maxTimeToWaitInSeconds));
}
}

public boolean isFinalizationDone() {
Expand All @@ -95,9 +149,8 @@ public V getVersionManager() {
return versionManager;
}

public synchronized StatusAndMessages preFinalize(String upgradeClientID,
T id)
throws UpgradeException {
private synchronized StatusAndMessages initFinalize(
String upgradeClientID, T id) throws UpgradeException {
switch (versionManager.getUpgradeState()) {
case STARTING_FINALIZATION:
return STARTING_MSG;
Expand All @@ -119,46 +172,12 @@ public synchronized StatusAndMessages preFinalize(String upgradeClientID,
}
versionManager.setUpgradeState(STARTING_FINALIZATION);

clientID = upgradeClientID;
this.clientID = upgradeClientID;
this.component = id;
return FINALIZATION_REQUIRED_MSG;
}
}

/*
* This method must be overriden by the component implementing the
* finalization logic.
*/
public StatusAndMessages finalize(String upgradeClientID, T id)
throws IOException {
StatusAndMessages response = preFinalize(upgradeClientID, id);
if (response.status() != FINALIZATION_REQUIRED) {
return response;
}

/**
* Overriding class should schedule actual finalization logic
* in a separate thread here.
*/
return STARTING_MSG;
}

@Override
public synchronized StatusAndMessages reportStatus(
String upgradeClientID, boolean takeover
) throws UpgradeException {
if (takeover) {
clientID = upgradeClientID;
}
assertClientId(upgradeClientID);
List<String> returningMsgs = new ArrayList<>(msgs.size()+10);
Status status = versionManager.getUpgradeState();
while (msgs.size() > 0) {
returningMsgs.add(msgs.poll());
}
return new StatusAndMessages(status, returningMsgs);
}

private void assertClientId(String id) throws UpgradeException {
if (this.clientID == null || !this.clientID.equals(id)) {
throw new UpgradeException("Unknown client tries to get finalization " +
Expand All @@ -168,44 +187,22 @@ private void assertClientId(String id) throws UpgradeException {
}
}

public void finalizeAndWaitForCompletion(String upgradeClientID, T service,
long maxTimeToWaitInSeconds)
throws IOException {

StatusAndMessages response = finalize(upgradeClientID, service);
LOG.info("Finalization Messages : {} ", response.msgs());
if (isFinalized(response.status())) {
return;
}

boolean success = false;
long endTime = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(maxTimeToWaitInSeconds);
while (System.currentTimeMillis() < endTime) {
try {
response = reportStatus(upgradeClientID, false);
LOG.info("Finalization Messages : {} ", response.msgs());
if (isFinalized(response.status())) {
success = true;
break;
}
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Finalization Wait thread interrupted!");
}
}
if (!success) {
LOG.error("Unable to finalize after waiting for {} seconds",
maxTimeToWaitInSeconds);
} else {
updateLayoutVersionInDB(versionManager, component);
}
private static boolean isFinalized(Status status) {
return status.equals(Status.ALREADY_FINALIZED)
|| status.equals(FINALIZATION_DONE);
}

private static boolean isFinalized(UpgradeFinalizer.Status status) {
return status.equals(UpgradeFinalizer.Status.ALREADY_FINALIZED)
|| status.equals(FINALIZATION_DONE);
protected void finalizeUpgrade(Supplier<Storage> storageSuppplier)
throws UpgradeException {
for (Object obj : versionManager.unfinalizedFeatures()) {
LayoutFeature lf = (LayoutFeature) obj;
Storage layoutStorage = storageSuppplier.get();
Optional<? extends UpgradeAction> action = lf.action(ON_FINALIZE);
finalizeFeature(lf, layoutStorage, action);
updateLayoutVersionInVersionFile(lf, layoutStorage);
versionManager.finalized(lf);
}
versionManager.completeFinalization();
}

protected void finalizeFeature(LayoutFeature feature, Storage config,
Expand Down Expand Up @@ -253,7 +250,7 @@ protected void runPrefinalizeStateActions(Function<LayoutFeature,
Function<UpgradeActionType, Optional<? extends UpgradeAction>> function =
aFunction.apply(lf);
Optional<? extends UpgradeAction> action =
function.apply(UNFINALIZED_STATE_VALIDATION);
function.apply(VALIDATE_IN_PREFINALIZE);
if (action.isPresent()) {
runValidationAction(lf, action.get());
}
Expand Down Expand Up @@ -283,7 +280,7 @@ private void runValidationAction(LayoutFeature f, UpgradeAction action)
LOG.error(String.format(msg, f.name()));
throw new UpgradeException(
String.format(msg, f.name()), ex,
PREFINALIZE_STATE_VALIDATION_FAILED);
PREFINALIZE_ACTION_VALIDATION_FAILED);
}
}

Expand Down Expand Up @@ -458,14 +455,9 @@ private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
throw new UpgradeException(msg, e, resultCode);
}

protected void updateLayoutVersionInDB(V vm, T comp) throws IOException {
throw new UnsupportedOperationException();
@VisibleForTesting
public void setFinalizationExecutor(DefaultUpgradeFinalizationExecutor
executor) {
finalizationExecutor = executor;
}

protected abstract void postFinalizeUpgrade() throws IOException;

protected abstract void finalizeUpgrade(Storage storageConfig)
throws UpgradeException;

protected abstract boolean preFinalizeUpgrade() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;

import org.apache.hadoop.ozone.common.Storage;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,49 +31,36 @@
* Unit/Integration tests can override this to provide error injected version
* of this class.
*/

@SuppressWarnings("checkstyle:VisibilityModifier")
public class DefaultUpgradeFinalizationExecutor {
public class DefaultUpgradeFinalizationExecutor<T> {
static final Logger LOG =
LoggerFactory.getLogger(DefaultUpgradeFinalizationExecutor.class);

public DefaultUpgradeFinalizationExecutor() {
}

public Void execute(Storage storageConfig,
BasicUpgradeFinalizer basicUpgradeFinalizer)
throws Exception {
public void execute(T component, BasicUpgradeFinalizer finalizer)
throws IOException {
try {
basicUpgradeFinalizer.emitStartingMsg();
basicUpgradeFinalizer.getVersionManager()
finalizer.emitStartingMsg();
finalizer.getVersionManager()
.setUpgradeState(FINALIZATION_IN_PROGRESS);

/*
* Before we can call finalize the feature, we need to make sure that
* all existing pipelines are closed and pipeline Manger would freeze
* all new pipeline creation.
*/
if(!basicUpgradeFinalizer.preFinalizeUpgrade()) {
return null;
}
finalizer.preFinalizeUpgrade(component);

basicUpgradeFinalizer.finalizeUpgrade(storageConfig);
finalizer.finalizeUpgrade(component);

basicUpgradeFinalizer.postFinalizeUpgrade();
finalizer.postFinalizeUpgrade(component);

basicUpgradeFinalizer.emitFinishedMsg();
return null;
finalizer.emitFinishedMsg();
} catch (Exception e) {
LOG.warn("Upgrade Finalization failed with following Exception:");
e.printStackTrace();
if (basicUpgradeFinalizer.getVersionManager().needsFinalization()) {
basicUpgradeFinalizer.getVersionManager()
LOG.warn("Upgrade Finalization failed with following Exception. ", e);
if (finalizer.getVersionManager().needsFinalization()) {
finalizer.getVersionManager()
.setUpgradeState(FINALIZATION_REQUIRED);
throw (e);
}
} finally {
basicUpgradeFinalizer.markFinalizationDone();
finalizer.markFinalizationDone();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ default String name() {
*/
enum UpgradeActionType {

// Run every time an unfinalized component is started up.
UNFINALIZED_STATE_VALIDATION,
// Run every time an un-finalized component is started up.
VALIDATE_IN_PREFINALIZE,

// Run exactly once when an upgraded cluster is detected with this new
// layout version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,14 @@ public String toString() {
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {

OK,

INVALID_REQUEST,

PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED,
REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED,
UPDATE_LAYOUT_VERSION_FAILED,
LAYOUT_FEATURE_FINALIZATION_FAILED,
PREFINALIZE_STATE_VALIDATION_FAILED,
FIRST_UPGRADE_START_ACTION_FAILED;
PREFINALIZE_ACTION_VALIDATION_FAILED,
FIRST_UPGRADE_START_ACTION_FAILED,
PREFINALIZE_VALIDATION_FAILED;
}
}
Loading