Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,25 @@ protected void afterReplay(TEnvironment env) {
// no-op
}

/**
* Called before we call the execute method of this procedure, but after we acquire the execution
* lock and procedure scheduler lock.
*/
protected void beforeExec(TEnvironment env) throws ProcedureSuspendedException {
// no-op
}

/**
* Called after we call the execute method of this procedure, and also after we initialize all the
* sub procedures and persist the the state if persistence is needed.
* <p>
* This is for doing some hooks after we initialize the sub procedures. See HBASE-29259 for more
* details on why we can not release the region lock inside the execute method.
*/
protected void afterExec(TEnvironment env) {
// no-op
}

/**
* Called when the procedure is marked as completed (success or rollback). The procedure
* implementor may use this method to cleanup in-memory states. This operation will not be retried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,7 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
reExecute = false;
procedure.resetPersistence();
try {
procedure.beforeExec(getEnvironment());
subprocs = procedure.doExecute(getEnvironment());
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
Expand All @@ -1790,11 +1791,13 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
suspended = true;
} catch (ProcedureYieldException e) {
LOG.trace("Yield {}", procedure, e);
procedure.afterExec(getEnvironment());
yieldProcedure(procedure);
return;
} catch (InterruptedException e) {
LOG.trace("Yield interrupt {}", procedure, e);
handleInterruptedException(procedure, e);
procedure.afterExec(getEnvironment());
yieldProcedure(procedure);
return;
} catch (Throwable e) {
Expand Down Expand Up @@ -1866,6 +1869,7 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
updateStoreOnExec(procStack, procedure, subprocs);
}
}
procedure.afterExec(getEnvironment());

// if the store is not running we are aborting
if (!store.isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,22 @@ private void unattach(MasterProcedureEnv env) {
getParent(env).unattachRemoteProc(this);
}

@Override
protected void beforeExec(MasterProcedureEnv env) {
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
}

@Override
protected void afterExec(MasterProcedureEnv env) {
RegionStateNode regionNode = getRegionNode(env);
regionNode.unlock();
}

@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
try {
switch (state) {
case REGION_REMOTE_PROCEDURE_DISPATCH: {
Expand Down Expand Up @@ -333,8 +344,6 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
} finally {
regionNode.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
Expand Down Expand Up @@ -392,19 +391,18 @@ private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
return Flow.HAS_MORE_STATE;
}

// Override to lock RegionStateNode
@SuppressWarnings("rawtypes")
@Override
protected Procedure[] execute(MasterProcedureEnv env)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
protected void beforeExec(MasterProcedureEnv env) {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegion());
regionNode.lock();
try {
return super.execute(env);
} finally {
regionNode.unlock();
}
}

@Override
protected void afterExec(MasterProcedureEnv env) {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegion());
regionNode.unlock();
}

private RegionStateNode getRegionStateNode(MasterProcedureEnv env) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure.TransitionType;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;

/**
* Testcase for HBASE-29259
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestTRSPPersistUninitializedSubProc {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTRSPPersistUninitializedSubProc.class);

private static HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static byte[] CF = Bytes.toBytes("cf");

private static TableName TN = TableName.valueOf("tn");

public static class TRSPForTest extends TransitRegionStateProcedure {

private boolean injected = false;

public TRSPForTest() {
}

public TRSPForTest(MasterProcedureEnv env, RegionInfo hri, ServerName assignCandidate,
boolean forceNewPlan, TransitionType type) {
super(env, hri, assignCandidate, forceNewPlan, type);
}

@Override
protected Procedure[] execute(MasterProcedureEnv env)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
Procedure[] subProcs = super.execute(env);
if (!injected && subProcs != null && subProcs[0] instanceof CloseRegionProcedure) {
injected = true;
ServerName sn = ((CloseRegionProcedure) subProcs[0]).targetServer;
env.getMasterServices().getServerManager().expireServer(sn);
try {
UTIL.waitFor(15000, () -> env.getMasterServices().getProcedures().stream().anyMatch(
p -> p instanceof ServerCrashProcedure && p.getState() != ProcedureState.INITIALIZING));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
// sleep 10 seconds to let the SCP interrupt the TRSP, where we will call TRSP.serverCrashed
Thread.sleep(10000);
}
return subProcs;
}
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(2);
UTIL.getAdmin().balancerSwitch(false, true);
UTIL.createTable(TN, CF);
UTIL.waitTableAvailable(TN);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}

@Test
public void testServerCrash() throws Exception {
HMaster master = UTIL.getHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
RegionInfo region = UTIL.getAdmin().getRegions(TN).get(0);
RegionStateNode rsn =
master.getAssignmentManager().getRegionStates().getRegionStateNode(region);
TRSPForTest trsp =
new TRSPForTest(procExec.getEnvironment(), region, null, false, TransitionType.REOPEN);
// attach it to RegionStateNode, to simulate normal reopen
rsn.setProcedure(trsp);
procExec.submitProcedure(trsp);
ProcedureTestingUtility.waitProcedure(procExec, trsp);
// make sure we do not store invalid procedure to procedure store
ProcedureTestingUtility.restart(procExec);
}
}