Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,19 @@ private static boolean isValidWALRootDir(Path walDir, final Configuration c) thr
return true;
}

public static Path constructWALRegionDirFromRegionInfo(final Configuration conf,
final TableName tableName, final String encodedRegionName)
throws IOException {
return new Path(getWALTableDir(conf, tableName),
encodedRegionName);
}

public static Path getWALTableDir(final Configuration conf, final TableName tableName)
throws IOException {
return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}

/**
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
* path rootdir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private void checkClosedRegions(final MasterProcedureEnv env) throws IOException
*/
private void checkClosedRegion(final MasterProcedureEnv env,
RegionInfo regionInfo) throws IOException {
if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getFileSystem(),
if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getMasterWalManager().getFileSystem(),
env.getMasterConfiguration(), regionInfo)) {
throw new IOException("Recovered.edits are found in Region: " + regionInfo
+ ", abort merge to prevent data loss");
Expand Down Expand Up @@ -795,14 +795,16 @@ private ServerName getServerName(final MasterProcedureEnv env) {
}

private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId = -1L;
for (RegionInfo region : regionsToMerge) {
maxSequenceId =
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region)));
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
walFS, getWALRegionDir(env, region)));
}
if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
maxSequenceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
Expand All @@ -41,6 +42,7 @@
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -216,9 +218,10 @@ private void updateRegionLocation(RegionInfo regionInfo, State state, Put put)
}

private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
MasterFileSystem mfs = master.getMasterFileSystem();
FileSystem walFS = master.getMasterWalManager().getFileSystem();
long maxSeqId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.constructWALRegionDirFromRegionInfo(
master.getConfiguration(), region.getTable(), region.getEncodedName()));
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public SplitTableRegionProcedure(final MasterProcedureEnv env,
* @throws IOException IOException
*/
private void checkClosedRegion(final MasterProcedureEnv env) throws IOException {
if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getFileSystem(),
if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getMasterWalManager().getFileSystem(),
env.getMasterConfiguration(), getRegion())) {
throw new IOException("Recovered.edits are found in Region: " + getRegion()
+ ", abort split to prevent data loss");
Expand Down Expand Up @@ -851,12 +851,14 @@ private int getRegionReplication(final MasterProcedureEnv env) throws IOExceptio
}

private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion()));
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI),
maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI),
maxSequenceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -134,6 +135,12 @@ protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) thr
return env.getMasterServices().getMasterFileSystem().getRegionDir(region);
}

protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region)
throws IOException {
return FSUtils.constructWALRegionDirFromRegionInfo(env.getMasterConfiguration(),
region.getTable(), region.getEncodedName());
}

/**
* Check that cluster is up and master is running. Check table is modifiable.
* If <code>enabled</code>, check table is enabled else check it is disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,13 +115,13 @@ protected Flow executeFromState(final MasterProcedureEnv env, final DisableTable
case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName)
.hasGlobalReplicationScope()) {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
try (BufferedMutator mutator = env.getMasterServices().getConnection()
.getBufferedMutator(TableName.META_TABLE_NAME)) {
for (RegionInfo region : env.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;

private Path regionDir;
private FileSystem walFS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for the role of this field


// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
Expand Down Expand Up @@ -926,7 +929,7 @@ private long initializeRegionInternals(final CancelableProgressable reporter,
stores.forEach(HStore::startReplayingFromWAL);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId);
} finally {
Expand Down Expand Up @@ -972,14 +975,14 @@ private long initializeRegionInternals(final CancelableProgressable reporter,
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long maxSeqIdFromFile =
WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir());
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir());
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
// The openSeqNum will always be increase even for read only region, as we rely on it to
// determine whether a region has been successfully reopend, so here we always need to update
// the max sequence id file.
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId - 1);
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
}

LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
Expand Down Expand Up @@ -1130,8 +1133,8 @@ RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
if (getWalFileSystem().exists(getWALRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
mvcc.getReadPoint());
}
}
Expand Down Expand Up @@ -1855,6 +1858,27 @@ public HRegionFileSystem getRegionFileSystem() {
return this.fs;
}

public HRegionFileSystem getRegionWALFileSystem() throws IOException {
return new HRegionFileSystem(conf, getWalFileSystem(),
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
}

public FileSystem getWalFileSystem() throws IOException {
if (walFS == null) {
walFS = FSUtils.getWALFileSystem(conf);
}
return walFS;
}

@VisibleForTesting
public Path getWALRegionDir() throws IOException {
if (regionDir == null) {
regionDir = FSUtils.constructWALRegionDirFromRegionInfo(conf, getRegionInfo().getTable(),
getRegionInfo().getEncodedName());
}
return regionDir;
}

@Override
public long getEarliestFlushTimeForAllStores() {
return Collections.min(lastStoreFlushTimeMap.values());
Expand Down Expand Up @@ -4483,8 +4507,7 @@ private boolean isFlushSize(MemStoreSize size) {
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
Map<byte[], Long> maxSeqIdInStores,
protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status)
throws IOException {
long minSeqIdForTheRegion = -1;
Expand All @@ -4495,21 +4518,23 @@ protected long replayRecoveredEditsIfAny(final Path regiondir,
}
long seqid = minSeqIdForTheRegion;

FileSystem fs = this.fs.getFileSystem();
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
FileSystem walFS = getWalFileSystem();
Path regionDir = getWALRegionDir();

NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
+ " recovered edits file(s) under " + regionDir);
}

if (files == null || files.isEmpty()) return seqid;

for (Path edits: files) {
if (edits == null || !fs.exists(edits)) {
if (edits == null || !walFS.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
continue;
}
if (isZeroLengthThenDelete(fs, edits)) continue;
if (isZeroLengthThenDelete(walFS, edits)) continue;

long maxSeqId;
String fileName = edits.getName();
Expand Down Expand Up @@ -4540,7 +4565,7 @@ protected long replayRecoveredEditsIfAny(final Path regiondir,
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
Expand All @@ -4563,16 +4588,16 @@ protected long replayRecoveredEditsIfAny(final Path regiondir,
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(
new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
new HStoreFile(walFS, file, this.conf, null, null, true));
}
getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
for (Path file: files) {
if (!fs.delete(file, false)) {
if (!walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
Expand All @@ -4597,12 +4622,12 @@ private long replayRecoveredEdits(final Path edits,
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
FileSystem walFS = getWalFileSystem();

status.setStatus("Opening recovered edits");
WAL.Reader reader = null;
try {
reader = WALFactory.createReader(fs, edits, conf);
reader = WALFactory.createReader(walFS, edits, conf);
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
Expand Down Expand Up @@ -4754,7 +4779,7 @@ private long replayRecoveredEdits(final Path edits,
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during " +
"wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
Expand All @@ -4764,7 +4789,7 @@ private long replayRecoveredEdits(final Path edits,
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "File corruption enLongAddered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
Expand Down
Loading