Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ private static boolean isValidWALRootDir(Path walDir, final Configuration c) thr
if (!qualifiedWalDir.equals(rootDir)) {
if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
throw new IllegalStateException("Illegal WAL directory specified. " +
"WAL directories are not permitted to be under the root directory if set.");
"WAL directories are not permitted to be under root directory: rootDir=" +
rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -22,24 +22,21 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
Expand All @@ -49,6 +46,9 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
Expand Down Expand Up @@ -77,10 +77,6 @@ public WALSplit() {}
* Represent an WALSplit, i.e. a single WAL file.
* Start- and EndTime are managed by the split, so that WAL files can be
* filtered before WALEdits are passed to the mapper(s).
* @param logFileName
* @param fileSize
* @param startTime
* @param endTime
*/
public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
this.logFileName = logFileName;
Expand Down Expand Up @@ -186,7 +182,9 @@ private void seek() throws IOException {

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) return false;
if (reader == null) {
return false;
}
this.currentPos = reader.getPosition();
Entry temp;
long i = -1;
Expand All @@ -204,7 +202,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
} while (temp != null && temp.getKey().getWriteTime() < startTime);

if (temp == null) {
if (i > 0) LOG.info("Skipped " + i + " entries.");
if (i > 0) {
LOG.info("Skipped " + i + " entries.");
}
LOG.info("Reached end of file.");
return false;
} else if (i > 0) {
Expand Down Expand Up @@ -242,7 +242,9 @@ public float getProgress() throws IOException, InterruptedException {
@Override
public void close() throws IOException {
LOG.info("Closing reader");
if (reader != null) this.reader.close();
if (reader != null) {
this.reader.close();
}
}
}

Expand Down Expand Up @@ -301,40 +303,56 @@ private Path[] getInputPaths(Configuration conf) {
inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
}

/**
* @param startTime If file looks like it has a timestamp in its name, we'll check if newer
* or equal to this value else we will filter out the file. If name does not
* seem to have a timestamp, we will just return it w/o filtering.
* @param endTime If file looks like it has a timestamp in its name, we'll check if older or equal
* to this value else we will filter out the file. If name does not seem to
* have a timestamp, we will just return it w/o filtering.
*/
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
throws IOException {
List<FileStatus> result = new ArrayList<>();
LOG.debug("Scanning " + dir.toString() + " for WAL files");

RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
if (!iter.hasNext()) return Collections.emptyList();
if (!iter.hasNext()) {
return Collections.emptyList();
}
while (iter.hasNext()) {
LocatedFileStatus file = iter.next();
if (file.isDirectory()) {
// recurse into sub directories
// Recurse into sub directories
result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
} else {
String name = file.getPath().toString();
int idx = name.lastIndexOf('.');
if (idx > 0) {
try {
long fileStartTime = Long.parseLong(name.substring(idx+1));
if (fileStartTime <= endTime) {
LOG.info("Found: " + file);
result.add(file);
}
} catch (NumberFormatException x) {
idx = 0;
}
}
if (idx == 0) {
LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
}
addFile(result, file, startTime, endTime);
}
}
// TODO: These results should be sorted? Results could be content of recovered.edits directory
// -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and
// then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator
return result;
}

static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime,
long endTime) {
long timestamp = WAL.getTimestamp(lfs.getPath().getName());
if (timestamp > 0) {
// Looks like a valid timestamp.
if (timestamp <= endTime && timestamp >= startTime) {
LOG.info("Found {}", lfs.getPath());
result.add(lfs);
} else {
LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(),
startTime, Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime));
}
} else {
// If no timestamp, add it regardless.
LOG.info("Found (no-timestamp!) {}", lfs);
result.add(lfs);
}
}

@Override
public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -26,7 +26,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -239,6 +238,7 @@ protected boolean filter(Context context, final Cell cell) {
super.cleanup(context);
}

@SuppressWarnings("checkstyle:EmptyBlock")
@Override
public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
Expand Down Expand Up @@ -377,17 +377,21 @@ private void usage(final String errorMsg) {
System.err.println(" <WAL inputdir> directory of WALs to replay.");
System.err.println(" <tables> comma separated list of tables. If no tables specified,");
System.err.println(" all are imported (even hbase:meta if present).");
System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by passing");
System.err.println(" <tableMappings>, a comma separated list of target tables.");
System.err.println(" If specified, each table in <tables> must have a mapping.");
System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by " +
"passing");
System.err.println(" <tableMappings>, a comma separated list of target " +
"tables.");
System.err.println(" If specified, each table in <tables> must have a " +
"mapping.");
System.err.println("To generate HFiles to bulk load instead of loading HBase directly, pass:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
System.err.println(" Only one table can be specified, and no mapping allowed!");
System.err.println("To specify a time range, pass:");
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
System.err.println(" The start and the end date of timerange. The dates can be expressed");
System.err.println(" in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format.");
System.err.println(" The start and the end date of timerange (inclusive). The dates can be");
System.err.println(" expressed in milliseconds-since-epoch or yyyy-MM-dd'T'HH:mm:ss.SS " +
"format.");
System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12");
System.err.println("Other options:");
System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category({ MapReduceTests.class, SmallTests.class})
public class TestWALInputFormat {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALInputFormat.class);

/**
* Test the primitive start/end time filtering.
*/
@Test
public void testAddFile() {
List<FileStatus> lfss = new ArrayList<>();
LocatedFileStatus lfs = Mockito.mock(LocatedFileStatus.class);
long now = System.currentTimeMillis();
Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now));
WALInputFormat.addFile(lfss, lfs, now, now);
assertEquals(1, lfss.size());
WALInputFormat.addFile(lfss, lfs, now - 1, now - 1);
assertEquals(1, lfss.size());
WALInputFormat.addFile(lfss, lfs, now - 2, now - 1);
assertEquals(1, lfss.size());
WALInputFormat.addFile(lfss, lfs, now - 2, now);
assertEquals(2, lfss.size());
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, now);
assertEquals(3, lfss.size());
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE);
assertEquals(4, lfss.size());
WALInputFormat.addFile(lfss, lfs, now, now + 2);
assertEquals(5, lfss.size());
WALInputFormat.addFile(lfss, lfs, now + 1, now + 2);
assertEquals(5, lfss.size());
Mockito.when(lfs.getPath()).thenReturn(new Path("/name"));
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE);
assertEquals(6, lfss.size());
Mockito.when(lfs.getPath()).thenReturn(new Path("/name.123"));
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE);
assertEquals(7, lfss.size());
Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now + ".meta"));
WALInputFormat.addFile(lfss, lfs, now, now);
assertEquals(8, lfss.size());
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -24,8 +24,8 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -73,7 +74,6 @@
*/
@Category({MapReduceTests.class, LargeTests.class})
public class TestWALPlayer {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALPlayer.class);
Expand All @@ -91,7 +91,7 @@ public class TestWALPlayer {

@BeforeClass
public static void beforeClass() throws Exception {
conf= TEST_UTIL.getConfiguration();
conf = TEST_UTIL.getConfiguration();
rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
fs = CommonFSUtils.getRootDirFileSystem(conf);
Expand All @@ -106,9 +106,32 @@ public static void afterClass() throws Exception {
logFs.delete(walRootDir, true);
}

/**
* Test that WALPlayer can replay recovered.edits files.
*/
@Test
public void testPlayingRecoveredEdit() throws Exception {
TableName tn = TableName.valueOf(TestRecoveredEdits.RECOVEREDEDITS_TABLENAME);
TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY);
// Copy testing recovered.edits file that is over under hbase-server test resources
// up into a dir in our little hdfs cluster here.
String hbaseServerTestResourcesEdits = System.getProperty("test.build.classes") +
"/../../../hbase-server/src/test/resources/" +
TestRecoveredEdits.RECOVEREDEDITS_PATH.getName();
assertTrue(new File(hbaseServerTestResourcesEdits).exists());
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
// Target dir.
Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory());
assertTrue(dfs.mkdirs(targetDir));
dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir);
assertEquals(0,
ToolRunner.run(new WALPlayer(this.conf), new String [] {targetDir.toString()}));
// I don't know how many edits are in this file for this table... so just check more than 1.
assertTrue(TEST_UTIL.countRows(tn) > 0);
}

/**
* Simple end-to-end test
* @throws Exception
*/
@Test
public void testWALPlayer() throws Exception {
Expand Down
Loading