diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 696dbb71b9f9..46ce065b8778 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1695,9 +1695,6 @@ private void execProcedure(RootProcedureState procStack, } } - // Add the procedure to the stack - procStack.addRollbackStep(procedure); - // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if ( @@ -1715,7 +1712,12 @@ private void execProcedure(RootProcedureState procStack, // Commit the transaction even if a suspend (state may have changed). Note this append // can take a bunch of time to complete. if (procedure.needPersistence()) { - updateStoreOnExec(procStack, procedure, subprocs); + // Add the procedure to the stack + // See HBASE-28210 on why we need synchronized here + synchronized (procStack) { + procStack.addRollbackStep(procedure); + updateStoreOnExec(procStack, procedure, subprocs); + } } // if the store is not running we are aborting diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java new file mode 100644 index 000000000000..4708df44745b --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.LinkedHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; +import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.AtomicUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * Testcase for HBASE-28210, where we persist the procedure which has been inserted later to + * {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading, + * and finally fail the start up of master. + */ +@Category({ MasterTests.class, SmallTests.class }) +public class TestStackIdHoles { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStackIdHoles.class); + + private final class DummyProcedureStore extends ProcedureStoreBase { + + private int numThreads; + + private final LinkedHashMap procMap = + new LinkedHashMap(); + + private final AtomicLong maxProcId = new AtomicLong(0); + + private final AtomicBoolean updated = new AtomicBoolean(false); + + @Override + public void start(int numThreads) throws IOException { + this.numThreads = numThreads; + setRunning(true); + } + + @Override + public void stop(boolean abort) { + } + + @Override + public int getNumThreads() { + return numThreads; + } + + @Override + public int setRunningProcedureCount(int count) { + return count; + } + + @Override + public void recoverLease() throws IOException { + } + + @Override + public void load(ProcedureLoader loader) throws IOException { + loader.setMaxProcId(maxProcId.get()); + ProcedureTree tree = ProcedureTree.build(procMap.values()); + loader.load(tree.getValidProcs()); + loader.handleCorrupted(tree.getCorruptedProcs()); + } + + @Override + public void insert(Procedure proc, Procedure[] subprocs) { + long max = proc.getProcId(); + synchronized (procMap) { + try { + procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); + if (subprocs != null) { + for (Procedure p : subprocs) { + procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); + max = Math.max(max, p.getProcId()); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + AtomicUtils.updateMax(maxProcId, max); + } + + @Override + public void insert(Procedure[] procs) { + long max = -1; + synchronized (procMap) { + try { + for (Procedure p : procs) { + procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); + max = Math.max(max, p.getProcId()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + AtomicUtils.updateMax(maxProcId, max); + } + + @Override + public void update(Procedure proc) { + // inject a sleep to simulate the scenario in HBASE-28210 + if (proc.hasParent() && proc.getStackIndexes() != null) { + int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1]; + try { + // sleep more times if the stack id is smaller + Thread.sleep(100L * (10 - lastStackId)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + // simulate the failure when updating the second sub procedure + if (!updated.compareAndSet(false, true)) { + procExec.stop(); + throw new RuntimeException("inject error"); + } + } + synchronized (procMap) { + try { + procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @Override + public void delete(long procId) { + synchronized (procMap) { + procMap.remove(procId); + } + } + + @Override + public void delete(Procedure parentProc, long[] subProcIds) { + synchronized (procMap) { + try { + procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc)); + for (long procId : subProcIds) { + procMap.remove(procId); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @Override + public void delete(long[] procIds, int offset, int count) { + synchronized (procMap) { + for (int i = 0; i < count; i++) { + long procId = procIds[offset + i]; + procMap.remove(procId); + } + } + } + } + + private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil(); + + private DummyProcedureStore procStore; + + private ProcedureExecutor procExec; + + @Before + public void setUp() throws IOException { + procStore = new DummyProcedureStore(); + procStore.start(4); + procExec = new ProcedureExecutor(HBTU.getConfiguration(), null, procStore); + procExec.init(4, true); + procExec.startWorkers(); + } + + @After + public void tearDown() { + procExec.stop(); + } + + public static class DummyProcedure extends NoopProcedure { + + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return new Procedure[] { new NoopProcedure(), new NoopProcedure() }; + } + } + + @Test + public void testLoad() throws IOException { + procExec.submitProcedure(new DummyProcedure()); + // wait for the error + HBTU.waitFor(30000, () -> !procExec.isRunning()); + procExec = new ProcedureExecutor(HBTU.getConfiguration(), null, procStore); + // make sure there is no error while loading + procExec.init(4, true); + } +}