Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
Expand Down Expand Up @@ -1367,7 +1368,9 @@ public RegionInfo getRegionInfo() {
return this.fs.getRegionInfo();
}

/** Returns Instance of {@link RegionServerServices} used by this HRegion. Can be null. */
/**
* Returns Instance of {@link RegionServerServices} used by this HRegion. Can be null.
*/
RegionServerServices getRegionServerServices() {
return this.rsServices;
}
Expand Down Expand Up @@ -2863,7 +2866,7 @@ private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WA
if (sink != null && !writeFlushWalMarker) {
/**
* Here for replication to secondary region replica could use {@link FlushAction#CANNOT_FLUSH}
* to recover writeFlushWalMarker is false, we create {@link WALEdit} for
* to recover when writeFlushWalMarker is false, we create {@link WALEdit} for
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
Expand Down Expand Up @@ -3694,7 +3697,7 @@ public void doPostOpCleanupForMiniBatch(
* @param familyMap Map of Cells by family
*/
protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
MemStoreSizing memstoreAccounting) throws IOException {
MemStoreSizing memstoreAccounting) {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
Expand Down Expand Up @@ -5231,7 +5234,7 @@ public void setReadsEnabled(boolean readsEnabled) {
* scenario but that do not make sense otherwise.
*/
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
MemStoreSizing memstoreAccounting) throws IOException {
MemStoreSizing memstoreAccounting) {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
Expand Down Expand Up @@ -8037,16 +8040,35 @@ private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
try {
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
WriteEntry writeEntry = walKey.getWriteEntry();
this.attachRegionReplicationInWALAppend(batchOp, miniBatchOp, walKey, walEdit, writeEntry);
// Call sync on our edit.
if (txid != 0) {
sync(txid, batchOp.durability);
}
/**
* If above {@link HRegion#sync} throws Exception, the RegionServer should be aborted and
* following {@link BatchOperation#writeMiniBatchOperationsToMemStore} will not be executed,
* so there is no need to replicate to secondary replica, for this reason here we attach the
* region replication action after the {@link HRegion#sync} is successful.
*/
this.attachRegionReplicationInWALAppend(batchOp, miniBatchOp, walKey, walEdit, writeEntry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not include this change here? Should be a separated issue.

return writeEntry;
} catch (IOException ioe) {
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}

/**
* If {@link WAL#sync} get a timeout exception, the only correct way is to abort the region
* server, as the design of {@link WAL#sync}, is to succeed or die, there is no 'failure'. It
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we should avoid aborting the RS when the WAL sync is slow or timeout. As in the design of HBASE-22301 we should try to roll slow WALs to connect and rewrite to faster DNs. If you want to recover the flushing of WALs by aborting the RS, you will suffer from a whole MTTR but only the new WAL created by the newly started RS is helpful to the question.

Copy link
Contributor Author

@comnetwork comnetwork Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunhelly , thank you for review, yes , I could catch your point, here means after we does what you said, such as roll WALs and open a new writer and try to write the WAL entries again and again, and finally WAL.sync still timeout, we abort the RS. We set a very large default value here, 5 minutes, usually the WAL system will abort the region server if it can not finish the sync within 5 minutes, see HBASE-27233 and #4633 for more discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, agree with the point 'no failure for WAL sync', whenever WAL.sync throws exception(no matter it is timeout or other), the regionserver should abort. This is a common logic.

* is usually not a big deal is because we set a very large default value(5 minutes) for
* {@link AbstractFSWAL#WAL_SYNC_TIMEOUT_MS}, usually the WAL system will abort the region
* server if it can not finish the sync within 5 minutes.
*/
if (ioe instanceof WALSyncTimeoutIOException) {
if (rsServices != null) {
rsServices.abort("WAL sync timeout,forcing server shutdown", ioe);
}
}
throw ioe;
}
}
Expand All @@ -8057,7 +8079,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
*/
private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
MiniBatchOperationInProgress<Mutation> miniBatchOp, WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
WriteEntry writeEntry) {
if (!regionReplicationSink.isPresent()) {
return;
}
Expand Down Expand Up @@ -8086,7 +8108,7 @@ private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
* replica.
*/
private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
WriteEntry writeEntry) {
if (walEdit == null || walEdit.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,8 +1880,7 @@ public long getSmallestReadPoint() {
* across all of them.
* @param readpoint readpoint below which we can safely remove duplicate KVs
*/
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
throws IOException {
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
this.storeEngine.readLock();
try {
this.memstore.upsert(cells, readpoint, memstoreSizing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
"hbase.regionserver.wal.slowsync.roll.interval.ms";
protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute

protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min

public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
Expand Down Expand Up @@ -881,7 +881,7 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
}
}
} catch (TimeoutIOException tioe) {
throw tioe;
throw new WALSyncTimeoutIOException(tioe);
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
throw convertInterruptedExceptionToIOException(ie);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Thrown when {@link WAL#sync} timeout.
*/
@SuppressWarnings("serial")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just add a generated serial id?

@InterfaceAudience.Private
public class WALSyncTimeoutIOException extends HBaseIOException {

public WALSyncTimeoutIOException() {
super();
}

public WALSyncTimeoutIOException(String message, Throwable cause) {
super(message, cause);
}

public WALSyncTimeoutIOException(String message) {
super(message);
}

public WALSyncTimeoutIOException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long
StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
}

private EventLoopGroup eventLoopGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments to say this will be used in tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 ,ok ,fixed.

protected EventLoopGroup eventLoopGroup;

private Class<? extends Channel> channelClass;
protected Class<? extends Channel> channelClass;

@Override
protected AsyncFSWAL createWAL() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -133,18 +134,21 @@ void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,

/**
* Sync what we have in the WAL.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
void sync() throws IOException;

/**
* Sync the WAL if the txId was not already sync'd.
* @param txid Transaction id to sync to.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
void sync(long txid) throws IOException;

/**
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
* vs hsync.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
default void sync(boolean forceSync) throws IOException {
sync();
Expand All @@ -154,6 +158,7 @@ default void sync(boolean forceSync) throws IOException {
* @param txid Transaction id to sync to.
* @param forceSync Flag to force sync rather than flushing to the buffer. Example - Hadoop hflush
* vs hsync.
* @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
*/
default void sync(long txid, boolean forceSync) throws IOException {
sync(txid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.wal;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
Expand Down Expand Up @@ -505,4 +506,10 @@ public final WALProvider getMetaWALProvider() {
public ExcludeDatanodeManager getExcludeDatanodeManager() {
return excludeDatanodeManager;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public String getFactoryId() {
return this.factoryId;
}
}
Loading