Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
Expand Down Expand Up @@ -1372,7 +1373,9 @@ public RegionInfo getRegionInfo() {
return this.fs.getRegionInfo();
}

/** Returns Instance of {@link RegionServerServices} used by this HRegion. Can be null. */
/**
* Returns Instance of {@link RegionServerServices} used by this HRegion. Can be null.
*/
RegionServerServices getRegionServerServices() {
return this.rsServices;
}
Expand Down Expand Up @@ -3661,7 +3664,7 @@ public void doPostOpCleanupForMiniBatch(
* @param familyMap Map of Cells by family
*/
protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
MemStoreSizing memstoreAccounting) throws IOException {
MemStoreSizing memstoreAccounting) {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
Expand Down Expand Up @@ -5083,7 +5086,7 @@ public void setReadsEnabled(boolean readsEnabled) {
* @see #applyToMemStore(HStore, Cell, MemStoreSizing)
*/
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
MemStoreSizing memstoreAccounting) throws IOException {
MemStoreSizing memstoreAccounting) {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
Expand Down Expand Up @@ -7887,6 +7890,19 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
if (walKey != null && walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}

/**
* If {@link WAL#sync} get a timeout exception, the only correct way is to abort the region
* server, as the design of {@link WAL#sync}, is to succeed or die, there is no 'failure'. It
* is usually not a big deal is because we set a very large default value(5 minutes) for
* {@link AbstractFSWAL#WAL_SYNC_TIMEOUT_MS}, usually the WAL system will abort the region
* server if it can not finish the sync within 5 minutes.
*/
if (ioe instanceof WALSyncTimeoutIOException) {
if (rsServices != null) {
rsServices.abort("WAL sync timeout,forcing server shutdown", ioe);
}
}
throw ioe;
}
return writeEntry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1910,8 +1910,7 @@ public long getSmallestReadPoint() {
* across all of them.
* @param readpoint readpoint below which we can safely remove duplicate KVs
*/
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
throws IOException {
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
this.storeEngine.readLock();
try {
this.memstore.upsert(cells, readpoint, memstoreSizing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
"hbase.regionserver.wal.slowsync.roll.interval.ms";
protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute

protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min

public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
Expand Down Expand Up @@ -871,7 +871,7 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
}
}
} catch (TimeoutIOException tioe) {
throw tioe;
throw new WALSyncTimeoutIOException(tioe);
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Thrown when {@link WAL#sync} timeout.
*/
@InterfaceAudience.Private
public class WALSyncTimeoutIOException extends HBaseIOException {

private static final long serialVersionUID = 5067699288291906985L;

public WALSyncTimeoutIOException() {
super();
}

public WALSyncTimeoutIOException(String message, Throwable cause) {
super(message, cause);
}

public WALSyncTimeoutIOException(String message) {
super(message);
}

public WALSyncTimeoutIOException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal,
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}
/**
* Here we do not abort the RegionServer for {@link WALSyncTimeoutIOException} as
* {@link HRegion#doWALAppend} does,because WAL Marker just records the internal state and
* seems it is no need to always abort the RegionServer when {@link WAL#sync} timeout,it is
* the internal state transition that determines whether RegionServer is aborted or not.
*/
throw ioe;
}
return walKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long
StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
}

private EventLoopGroup eventLoopGroup;
/**
* Protected visibility for used in tests.
*/
protected EventLoopGroup eventLoopGroup;

private Class<? extends Channel> channelClass;
/**
* Protected visibility for used in tests.
*/
protected Class<? extends Channel> channelClass;

@Override
protected AsyncFSWAL createWAL() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -136,18 +137,21 @@ void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,

/**
* Sync what we have in the WAL.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
void sync() throws IOException;

/**
* Sync the WAL if the txId was not already sync'd.
* @param txid Transaction id to sync to.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
void sync(long txid) throws IOException;

/**
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
* vs hsync.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
default void sync(boolean forceSync) throws IOException {
sync();
Expand All @@ -157,6 +161,7 @@ default void sync(boolean forceSync) throws IOException {
* @param txid Transaction id to sync to.
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
* vs hsync.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
default void sync(long txid, boolean forceSync) throws IOException {
sync(txid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand Down Expand Up @@ -55,7 +54,6 @@
* named for the method and does its stuff against that.
*/
@Category({ MasterTests.class, LargeTests.class })
@SuppressWarnings("deprecation")
public class TestWarmupRegion {

@ClassRule
Expand All @@ -67,7 +65,6 @@ public class TestWarmupRegion {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[] VALUE = Bytes.toBytes("testValue");
private static byte[] COLUMN = Bytes.toBytes("column");
private static int numRows = 10000;
Expand All @@ -80,7 +77,6 @@ public class TestWarmupRegion {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster(SLAVES);
}

Expand Down
Loading