Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
Expand All @@ -46,6 +47,7 @@
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
Expand Down Expand Up @@ -106,6 +108,8 @@ public final class MasterRegion {

private static final int REGION_ID = 1;

private final Server server;

private final WALFactory walFactory;

final HRegion region;
Expand All @@ -114,8 +118,9 @@ public final class MasterRegion {

private MasterRegionWALRoller walRoller;

private MasterRegion(HRegion region, WALFactory walFactory,
private MasterRegion(Server server, HRegion region, WALFactory walFactory,
MasterRegionFlusherAndCompactor flusherAndCompactor, MasterRegionWALRoller walRoller) {
this.server = server;
this.region = region;
this.walFactory = walFactory;
this.flusherAndCompactor = flusherAndCompactor;
Expand All @@ -139,8 +144,14 @@ private void shutdownWAL() {
}

public void update(UpdateMasterRegion action) throws IOException {
action.update(region);
flusherAndCompactor.onUpdate();
try {
action.update(region);
flusherAndCompactor.onUpdate();
} catch (WALSyncTimeoutIOException e) {
LOG.error(HBaseMarkers.FATAL, "WAL sync timeout. Aborting server.");
server.abort("WAL sync timeout", e);
throw e;
}
}

public Result get(Get get) throws IOException {
Expand All @@ -156,10 +167,16 @@ public RegionScanner getRegionScanner(Scan scan) throws IOException {
}

public FlushResult flush(boolean force) throws IOException {
flusherAndCompactor.resetChangesAfterLastFlush();
FlushResult flushResult = region.flush(force);
flusherAndCompactor.recordLastFlushTime();
return flushResult;
try {
flusherAndCompactor.resetChangesAfterLastFlush();
FlushResult flushResult = region.flush(force);
flusherAndCompactor.recordLastFlushTime();
return flushResult;
} catch (WALSyncTimeoutIOException e) {
LOG.error(HBaseMarkers.FATAL, "WAL sync timeout. Aborting server.");
server.abort("WAL sync timeout", e);
throw e;
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down Expand Up @@ -444,6 +461,6 @@ public static MasterRegion create(MasterRegionParams params) throws IOException
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will"
+ " be created again when we actually archive the hfiles later, so continue", archiveDir);
}
return new MasterRegion(region, walFactory, flusherAndCompactor, walRoller);
return new MasterRegion(server, region, walFactory, flusherAndCompactor, walRoller);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.region;

import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;

@Category({ MasterTests.class, MediumTests.class })
public class TestMasterRegionWALSyncTimeoutIOException extends MasterRegionTestBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterRegionWALSyncTimeoutIOException.class);

private static final Duration WAL_SYNC_TIMEOUT = Duration.ofSeconds(3);

private static volatile boolean testWalTimeout = false;

@Override
protected void configure(Configuration conf) throws IOException {
conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, WALProvider.class);
conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, WAL_SYNC_TIMEOUT.toMillis());
}

@Override
protected void configure(MasterRegionParams params) {
params.flushIntervalMs(Duration.ofSeconds(1).toMillis());
}

@Test
public void testUpdateWalSyncWriteException() {
testWalTimeout = true;
assertThrows(WALSyncTimeoutIOException.class, () -> {
for (int i = 0; i < 10; i++) {
region.update(
r -> r.put(new Put(Bytes.toBytes("0")).addColumn(CF1, QUALIFIER, Bytes.toBytes("0"))));
Thread.sleep(Duration.ofSeconds(1).toMillis());
}
});
}

public static class SlowAsyncFSWAL extends AsyncFSWAL {

public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException {
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, null, null, eventLoopGroup, channelClass, monitor);
}

@Override
protected void atHeadOfRingBufferEventHandlerAppend() {
if (testWalTimeout) {
try {
Thread.sleep(WAL_SYNC_TIMEOUT.plusSeconds(1).toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
super.atHeadOfRingBufferEventHandlerAppend();
}
}

public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider {

@Override
protected AsyncFSWAL createWAL() throws IOException {
return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()),
getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
}
}
}