hris) {
} catch (InterruptedException ite) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
- LOG.debug(
- "ExecutionException during HDFSBlocksDistribution computation. for region = "
- + hregionInfo.getEncodedName(), ee);
+ LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
+ hregionInfo.getEncodedName(), ee);
}
index++;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 9f8a717d21fb..4455d7a7491b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -122,6 +123,7 @@ public class HStoreFile implements StoreFile {
// StoreFile.Reader
private volatile StoreFileReader initialReader;
+ private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
// Block cache configuration and reference.
private final CacheConfig cacheConf;
@@ -347,7 +349,11 @@ public OptionalLong getBulkLoadTimestamp() {
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
- return this.fileInfo.getHDFSBlockDistribution();
+ if (initialReaderBlockDistribution != null) {
+ return initialReaderBlockDistribution.getHDFSBlockDistribution();
+ } else {
+ return this.fileInfo.getHDFSBlockDistribution();
+ }
}
/**
@@ -365,6 +371,13 @@ private void open() throws IOException {
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
+
+ if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
+ boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
+ FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
+ this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
+ }
+
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java
new file mode 100644
index 000000000000..aa15cda922d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.io.FileLink;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Computes the HDFSBlockDistribution for a file based on the underlying located blocks
+ * for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
+ * allocating an array of numBlocks size per call. It may also involve calling the namenode, if
+ * the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
+ * we cache the computed distribution for a configurable period of time.
+ *
+ * This class only gets instantiated for the first FSDataInputStream of each StoreFile (i.e.
+ * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
+ * value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
+ *
+ * Once the backing FSDataInputStream is closed, we should not expect the distribution result
+ * to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
+ * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
+ * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
+ * be created for the new initialReader.
+ */
+@InterfaceAudience.Private
+public class InputStreamBlockDistribution {
+ private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
+
+ private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
+ "hbase.locality.inputstream.derive.enabled";
+ private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
+
+ private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
+ "hbase.locality.inputstream.derive.cache.period";
+ private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
+
+ private final FSDataInputStream stream;
+ private final StoreFileInfo fileInfo;
+ private final int cachePeriodMs;
+
+ private HDFSBlocksDistribution hdfsBlocksDistribution;
+ private long lastCachedAt;
+ private boolean streamUnsupported;
+
+ /**
+ * This should only be called for the first FSDataInputStream of a StoreFile,
+ * in {@link HStoreFile#open()}.
+ *
+ * @see InputStreamBlockDistribution
+ * @param stream the input stream to derive locality from
+ * @param fileInfo the StoreFileInfo for the related store file
+ */
+ public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
+ this.stream = stream;
+ this.fileInfo = fileInfo;
+ this.cachePeriodMs = fileInfo.getConf().getInt(
+ HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
+ DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
+ this.lastCachedAt = EnvironmentEdgeManager.currentTime();
+ this.streamUnsupported = false;
+ this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
+ }
+
+ /**
+ * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
+ */
+ public static boolean isEnabled(Configuration conf) {
+ return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
+ DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
+ }
+
+ /**
+ * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
+ * is expired.
+ */
+ public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
+ if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
+ try {
+ LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
+ computeBlockDistribution();
+ } catch (IOException e) {
+ LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
+ fileInfo, e);
+ }
+ }
+ return hdfsBlocksDistribution;
+ }
+
+ private void computeBlockDistribution() throws IOException {
+ lastCachedAt = EnvironmentEdgeManager.currentTime();
+
+ FSDataInputStream stream;
+ if (fileInfo.isLink()) {
+ stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
+ } else {
+ stream = this.stream;
+ }
+
+ if (!(stream instanceof HdfsDataInputStream)) {
+ if (!streamUnsupported) {
+ LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
+ + "used to derive locality. Falling back on cached value.",
+ stream, fileInfo, fileInfo.isLink());
+ streamUnsupported = true;
+ }
+ return;
+ }
+
+ streamUnsupported = false;
+ hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
+ }
+
+ /**
+ * For tests only, sets lastCachedAt so we can force a refresh
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ synchronized void setLastCachedAt(long timestamp) {
+ lastCachedAt = timestamp;
+ }
+
+ /**
+ * For tests only, returns the configured cache period
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ long getCachePeriodMs() {
+ return cachePeriodMs;
+ }
+
+ /**
+ * For tests only, returns whether the passed stream is supported
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ boolean isStreamUnsupported() {
+ return streamUnsupported;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 520cd4379778..90a969389fd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -81,6 +81,9 @@
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Progressable;
@@ -692,6 +695,38 @@ public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOExc
return fs.exists(metaRegionDir);
}
+ /**
+ * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
+ * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
+ * This method retrieves those blocks from the input stream and uses them to calculate
+ * HDFSBlockDistribution.
+ *
+ * The underlying method in DFSInputStream does attempt to use locally cached blocks, but
+ * may hit the namenode if the cache is determined to be incomplete. The method also involves
+ * making copies of all LocatedBlocks rather than return the underlying blocks themselves.
+ */
+ public static HDFSBlocksDistribution computeHDFSBlocksDistribution(
+ HdfsDataInputStream inputStream) throws IOException {
+ List blocks = inputStream.getAllBlocks();
+ HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
+ for (LocatedBlock block : blocks) {
+ String[] hosts = getHostsForLocations(block);
+ long len = block.getBlockSize();
+ StorageType[] storageTypes = block.getStorageTypes();
+ blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
+ }
+ return blocksDistribution;
+ }
+
+ private static String[] getHostsForLocations(LocatedBlock block) {
+ DatanodeInfo[] locations = block.getLocations();
+ String[] hosts = new String[locations.length];
+ for (int i = 0; i < hosts.length; i++) {
+ hosts[i] = locations[i].getHostName();
+ }
+ return hosts;
+ }
+
/**
* Compute HDFS blocks distribution of a given file, or a portion of the file
* @param fs file system
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
index 879606807c8a..52e56cbf4d6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.ClassRule;
import org.junit.Test;
@@ -88,6 +89,40 @@ public void testHashCode() {
assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
}
+ /**
+ * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped
+ * to a {@link HdfsDataInputStream} by
+ * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
+ */
+ @Test
+ public void testGetUnderlyingFSDataInputStream() throws Exception {
+ HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ Configuration conf = testUtil.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+ testUtil.startMiniDFSCluster(1);
+ try {
+ MiniDFSCluster cluster = testUtil.getDFSCluster();
+ FileSystem fs = cluster.getFileSystem();
+
+ Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
+
+ writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
+
+ List files = new ArrayList();
+ files.add(originalPath);
+
+ FileLink link = new FileLink(files);
+ FSDataInputStream stream = link.open(fs);
+
+ FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
+ assertTrue(underlying instanceof HdfsDataInputStream);
+ } finally {
+ testUtil.shutdownMiniCluster();
+ }
+ }
+
/**
* Test, on HDFS, that the FileLink is still readable
* even when the current file gets renamed.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
index 0362e13c944c..6e011d06aa7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
@@ -19,18 +19,32 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -167,4 +181,102 @@ public void testRefreshAndWait() throws Exception {
}
}
}
+
+ @Test
+ public void testRefreshRegionsWithChangedLocality() throws InterruptedException {
+ TableDescriptor table =
+ TableDescriptorBuilder.newBuilder(TableName.valueOf("RegionLocationFinder")).build();
+
+ int numRegions = 100;
+ List regions = new ArrayList<>(numRegions);
+
+ for (int i = 1; i <= numRegions; i++) {
+ byte[] startKey = i == 0 ? HConstants.EMPTY_START_ROW : Bytes.toBytes(i);
+ byte[] endKey = i == numRegions ? HConstants.EMPTY_BYTE_ARRAY : Bytes.toBytes(i + 1);
+ RegionInfo region =
+ RegionInfoBuilder.newBuilder(table.getTableName()).setStartKey(startKey).setEndKey(endKey)
+ .build();
+ regions.add(region);
+ }
+
+ ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
+ RegionInfo testRegion = regions.get(0);
+
+ RegionLocationFinder finder = new RegionLocationFinder() {
+ @Override
+ protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) {
+ return generate(region);
+ }
+ };
+
+ // cache for comparison later
+ Map cache = new HashMap<>();
+ for (RegionInfo region : regions) {
+ HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+ cache.put(region, hbd);
+ }
+
+ finder.setClusterMetrics(
+ getMetricsWithLocality(testServer, testRegion.getRegionName(), 0.123f));
+
+ // everything should be same as cached, because metrics were null before
+ for (RegionInfo region : regions) {
+ HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+ assertSame(cache.get(region), hbd);
+ }
+
+ finder.setClusterMetrics(
+ getMetricsWithLocality(testServer, testRegion.getRegionName(), 0.345f));
+
+ // cache refresh happens in a background thread, so we need to wait for the value to
+ // update before running assertions.
+ long now = System.currentTimeMillis();
+ HDFSBlocksDistribution cached = cache.get(testRegion);
+ HDFSBlocksDistribution newValue;
+ do {
+ Thread.sleep(1_000);
+ newValue = finder.getBlockDistribution(testRegion);
+ } while (cached == newValue && System.currentTimeMillis() - now < 30_000);
+
+ // locality changed just for our test region, so it should no longer be the same
+ for (RegionInfo region : regions) {
+ HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+ if (region.equals(testRegion)) {
+ assertNotSame(cache.get(region), hbd);
+ } else {
+ assertSame(cache.get(region), hbd);
+ }
+ }
+ }
+
+ private static HDFSBlocksDistribution generate(RegionInfo region) {
+ HDFSBlocksDistribution distribution = new HDFSBlocksDistribution();
+ int seed = region.hashCode();
+ Random rand = new Random(seed);
+ int size = 1 + rand.nextInt(10);
+ for (int i = 0; i < size; i++) {
+ distribution.addHostsAndBlockWeight(new String[] { "host-" + i }, 1 + rand.nextInt(100));
+ }
+ return distribution;
+ }
+
+ private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region,
+ float locality) {
+ RegionMetrics regionMetrics = mock(RegionMetrics.class);
+ when(regionMetrics.getDataLocality()).thenReturn(locality);
+
+ Map regionMetricsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ regionMetricsMap.put(region, regionMetrics);
+
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);
+
+ Map serverMetricsMap = new HashMap<>();
+ serverMetricsMap.put(serverName, serverMetrics);
+
+ ClusterMetrics metrics = mock(ClusterMetrics.class);
+ when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+
+ return metrics;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java
new file mode 100644
index 000000000000..56361e8d9436
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.io.FileLink;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class})
+public class TestInputStreamBlockDistribution {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class);
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path testPath;
+
+ @Before
+ public void setUp() throws Exception {
+ HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ conf = testUtil.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+ testUtil.startMiniDFSCluster(1);
+ MiniDFSCluster cluster = testUtil.getDFSCluster();
+ fs = cluster.getFileSystem();
+
+ testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
+
+ writeSomeData(fs, testPath, 256 << 20, (byte)2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.delete(testPath, false);
+ fs.close();
+ }
+
+ @Test
+ public void itDerivesLocalityFromHFileInputStream() throws Exception {
+ try (FSDataInputStream stream = fs.open(testPath)) {
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+ InputStreamBlockDistribution test =
+ new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ assertNotSame(initial, test.getHDFSBlockDistribution());
+ }
+
+ }
+
+ @Test
+ public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
+ List files = new ArrayList();
+ files.add(testPath);
+
+ FileLink link = new FileLink(files);
+ try (FSDataInputStream stream = link.open(fs)) {
+
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+ InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream,
+ getMockedStoreFileInfo(initial, true));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ assertNotSame(initial, test.getHDFSBlockDistribution());
+ }
+ }
+
+ @Test
+ public void itFallsBackOnLastKnownValueWhenUnsupported() {
+ FSDataInputStream fakeStream = mock(FSDataInputStream.class);
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+ InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
+ getMockedStoreFileInfo(initial, false));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
+ assertSame(initial, test.getHDFSBlockDistribution());
+ assertTrue(test.isStreamUnsupported());
+ }
+
+ @Test
+ public void itFallsBackOnLastKnownValueOnException() throws IOException {
+ HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
+ when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
+
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+ InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
+ getMockedStoreFileInfo(initial, false));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ // fakeStream throws an exception, so falls back on original
+ assertSame(initial, test.getHDFSBlockDistribution());
+
+ assertFalse(test.isStreamUnsupported());
+ }
+
+ /**
+ * Write up to 'size' bytes with value 'v' into a new file called 'path'.
+ */
+ private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
+ byte[] data = new byte[4096];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = v;
+ }
+
+ FSDataOutputStream stream = fs.create(path);
+ try {
+ long written = 0;
+ while (written < size) {
+ stream.write(data, 0, data.length);
+ written += data.length;
+ }
+ } finally {
+ stream.close();
+ }
+ }
+
+ private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
+ boolean isFileLink) {
+ StoreFileInfo mock = mock(StoreFileInfo.class);
+ when(mock.getHDFSBlockDistribution())
+ .thenReturn(distribution);
+ when(mock.getConf()).thenReturn(conf);
+ when(mock.isLink()).thenReturn(isFileLink);
+ return mock;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 1e5dbf2108a6..32d0b74b9ab9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -105,7 +106,31 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
out.close();
}
- @Test public void testcomputeHDFSBlocksDistribution() throws Exception {
+ @Test
+ public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
+ testComputeHDFSBlocksDistribution((fs, testFile) -> {
+ try (FSDataInputStream open = fs.open(testFile)) {
+ assertTrue(open instanceof HdfsDataInputStream);
+ return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
+ }
+ });
+ }
+
+ @Test
+ public void testComputeHDFSBlockDistribution() throws Exception {
+ testComputeHDFSBlocksDistribution((fs, testFile) -> {
+ FileStatus status = fs.getFileStatus(testFile);
+ return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ });
+ }
+
+ @FunctionalInterface
+ interface HDFSBlockDistributionFunction {
+ HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
+ }
+
+ private void testComputeHDFSBlocksDistribution(
+ HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
final int DEFAULT_BLOCK_SIZE = 1024;
conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = null;
@@ -129,9 +154,10 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
boolean ok;
do {
ok = true;
- FileStatus status = fs.getFileStatus(testFile);
+
HDFSBlocksDistribution blocksDistribution =
- FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ fileToBlockDistribution.getForPath(fs, testFile);
+
long uniqueBlocksTotalWeight =
blocksDistribution.getUniqueBlocksTotalWeight();
for (String host : hosts) {
@@ -163,9 +189,8 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
long weight;
long uniqueBlocksTotalWeight;
do {
- FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
- FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ fileToBlockDistribution.getForPath(fs, testFile);
uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
String tophost = blocksDistribution.getTopHosts().get(0);
@@ -197,8 +222,7 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
HDFSBlocksDistribution blocksDistribution;
do {
- FileStatus status = fs.getFileStatus(testFile);
- blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
// NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
}
while (blocksDistribution.getTopHosts().size() != 3 &&