Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,31 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
}
}

// find all the sync futures between these two txids to see if we need to issue a hsync, if no
// sync futures then just use the default one.
private boolean isHsync(long beginTxid, long endTxid) {
SortedSet<SyncFuture> futures =
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
if (futures.isEmpty()) {
return useHsync;
}
for (SyncFuture future : futures) {
if (future.isForceSync()) {
return true;
}
}
return false;
}

private void sync(AsyncWriter writer) {
fileLengthAtLastSync = writer.getLength();
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
boolean shouldUseHsync =
isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L;
addListener(writer.sync(useHsync), (result, error) -> {
addListener(writer.sync(shouldUseHsync), (result, error) -> {
if (error != null) {
syncFailed(epoch, error);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,9 @@ public void run() {
//TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
long start = System.nanoTime();
Throwable lastException = null;
boolean wasRollRequested = false;
try {
TraceUtil.addTimelineAnnotation("syncing writer");
writer.sync(useHsync);
writer.sync(takeSyncFuture.isForceSync());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? If useHsync is set, doesn't that mean sync every edit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also confusing me a bit but after investigating, I think the logic is correct. As this config can be changed at table level or even mutation level by design, we should follow what we have in the SyncFuture, as it is set when syncing the WAL in HRegion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the desion allow different table/mutation use different durability, but these table share one WAL writer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you can change the durability by mutation, you can not use different WALs for different mutations for the same region? But anyway, you can implement a multi wal strategy to use different WALs for different durability levels.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I went back over this and this is correct as the SyncFuture uses whatever the config is set to when created.

bq. So the desion allow different table/mutation use different durability, but these table share one WAL writer?

Yes.. this could make for some 'interesting' results. IIRC, looking at this w/ Lars a while back, if async WAL and you were trying to manage the syncs from the application, a client could get an ack though all puts hadn't yet syncd'.

TraceUtil.addTimelineAnnotation("writer synced");
currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -72,18 +74,54 @@ protected void resetSyncFlag(CustomAsyncFSWAL wal) {
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
return wal.getSyncFlag();
}

@Override
protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) {
return wal.getWriterSyncFlag();
}
}

class CustomAsyncFSWAL extends AsyncFSWAL {

private Boolean syncFlag;

private Boolean writerSyncFlag;

public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null,
eventLoopGroup, channelClass);
}

@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter writer = super.createWriterInstance(path);
return new AsyncWriter() {

@Override
public void close() throws IOException {
writer.close();
}

@Override
public long getLength() {
return writer.getLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;
return writer.sync(forceSync);
}

@Override
public void append(Entry entry) {
writer.append(entry);
}
};
}

@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
Expand All @@ -98,9 +136,14 @@ public void sync(long txid, boolean forceSync) throws IOException {

void resetSyncFlag() {
this.syncFlag = null;
this.writerSyncFlag = null;
}

Boolean getSyncFlag() {
return syncFlag;
}

Boolean getWriterSyncFlag() {
return writerSyncFlag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -51,16 +52,51 @@ protected void resetSyncFlag(CustomFSHLog wal) {
protected Boolean getSyncFlag(CustomFSHLog wal) {
return wal.getSyncFlag();
}

@Override
protected Boolean getWriterSyncFlag(CustomFSHLog wal) {
return wal.getWriterSyncFlag();
}
}

class CustomFSHLog extends FSHLog {
private Boolean syncFlag;

private Boolean writerSyncFlag;

public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}

@Override
protected Writer createWriterInstance(Path path) throws IOException {
Writer writer = super.createWriterInstance(path);
return new Writer() {

@Override
public void close() throws IOException {
writer.close();
}

@Override
public long getLength() {
return writer.getLength();
}

@Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;
writer.sync(forceSync);
}

@Override
public void append(Entry entry) throws IOException {
writer.append(entry);
}
};
}

@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
Expand All @@ -75,9 +111,14 @@ public void sync(long txid, boolean forceSync) throws IOException {

void resetSyncFlag() {
this.syncFlag = null;
this.writerSyncFlag = null;
}

Boolean getSyncFlag() {
return syncFlag;
}

Boolean getWriterSyncFlag() {
return writerSyncFlag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -77,54 +76,76 @@ protected abstract T getWAL(FileSystem fs, Path root, String logDir, Configurati

protected abstract Boolean getSyncFlag(T wal);

protected abstract Boolean getWriterSyncFlag(T wal);

@Test
public void testWALDurability() throws IOException {
byte[] bytes = Bytes.toBytes(getName());
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);

// global hbase.wal.hsync false, no override in put call - hflush
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
T wal = getWAL(fs, rootDir, getName(), conf);
HRegion region = initHRegion(tableName, null, null, wal);
byte[] bytes = Bytes.toBytes(getName());
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);

resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertFalse(getSyncFlag(wal));

region.close();
wal.close();
try {
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
assertNull(getWriterSyncFlag(wal));
region.put(put);
assertFalse(getSyncFlag(wal));
assertFalse(getWriterSyncFlag(wal));

// global hbase.wal.hsync false, durability set in put call - fsync
put.setDurability(Durability.FSYNC_WAL);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
assertNull(getWriterSyncFlag(wal));
region.put(put);
assertTrue(getSyncFlag(wal));
assertTrue(getWriterSyncFlag(wal));
} finally {
HBaseTestingUtility.closeRegionAndWAL(region);
}

// global hbase.wal.hsync true, no override in put call
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
fs = FileSystem.get(conf);
wal = getWAL(fs, rootDir, getName(), conf);
region = initHRegion(tableName, null, null, wal);

resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertEquals(getSyncFlag(wal), true);

// global hbase.wal.hsync true, durability set in put call - fsync
put.setDurability(Durability.FSYNC_WAL);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertTrue(getSyncFlag(wal));

// global hbase.wal.hsync true, durability set in put call - sync
put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
put.setDurability(Durability.SYNC_WAL);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
region.put(put);
assertFalse(getSyncFlag(wal));

HBaseTestingUtility.closeRegionAndWAL(region);
try {
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
assertNull(getWriterSyncFlag(wal));
region.put(put);
assertTrue(getSyncFlag(wal));
assertTrue(getWriterSyncFlag(wal));

// global hbase.wal.hsync true, durability set in put call - fsync
put.setDurability(Durability.FSYNC_WAL);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
assertNull(getWriterSyncFlag(wal));
region.put(put);
assertTrue(getSyncFlag(wal));
assertTrue(getWriterSyncFlag(wal));

// global hbase.wal.hsync true, durability set in put call - sync
put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
put.setDurability(Durability.SYNC_WAL);
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
assertNull(getWriterSyncFlag(wal));
region.put(put);
assertFalse(getSyncFlag(wal));
assertFalse(getWriterSyncFlag(wal));
} finally {
HBaseTestingUtility.closeRegionAndWAL(region);
}
}

private String getName() {
Expand Down