Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,8 @@ public interface Store {
* if you try to set a configuration.
*/
Configuration getReadOnlyConfiguration();

default String getName() {
return String.format("%s:%s", getRegionInfo().getEncodedName(), getColumnFamilyName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -171,4 +173,14 @@ public static Configuration createStoreConfiguration(Configuration conf, TableDe
return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
.addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
}

public static List<HStoreFile> filteredReferenceFiles(final Collection<HStoreFile> files) {
List<HStoreFile> referenceFiles = new ArrayList<>();
for (HStoreFile sf : files) {
if (sf.isReference() || StoreFileInfo.isReference(sf.getPath())) {
referenceFiles.add(sf);
}
}
return referenceFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;

/**
* Configuration class for stripe store and compactions.
Expand Down Expand Up @@ -65,6 +65,11 @@ public class StripeStoreConfig {
public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
"hbase.store.stripe.region.split.max.imbalance";

/**
* Configure for enable priority select Reference files to compact in StripeCompactPolicy
*/
public static final String PRIORITY_COMPACT_REFERENCE_FILES_ENABLED =
"hbase.store.stripe.region.priority.compact.reference.files.enabled";

private final float maxRegionSplitImbalance;
private final int level0CompactMinFiles;
Expand All @@ -77,6 +82,8 @@ public class StripeStoreConfig {
private final boolean flushIntoL0;
private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount

private final boolean priorityCompactRefsEnabled;

private static final double EPSILON = 0.001; // good enough for this, not a real epsilon.
public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
Expand Down Expand Up @@ -109,6 +116,8 @@ public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
}
this.initialCount = initialCount;
this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
this.priorityCompactRefsEnabled =
config.getBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, false);
}

private static float getFloat(
Expand Down Expand Up @@ -163,4 +172,8 @@ public float getSplitCount() {
public long getSplitPartSize() {
return splitPartSize;
}

public boolean isPriorityCompactRefsEnabled() {
return priorityCompactRefsEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected void createComponents(
Configuration conf, HStore store, CellComparator comparator) throws IOException {
this.config = new StripeStoreConfig(conf, store);
this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config, store);
this.storeFlusher = new StripeStoreFlusher(
conf, store, this.compactionPolicy, this.storeFileManager);
this.compactor = new StripeCompactor(conf, store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,31 @@ private static class State {

private final int blockingFileCount;

private final String storeName;

public StripeStoreFileManager(
CellComparator kvComparator, Configuration conf, StripeStoreConfig config) {
CellComparator kvComparator, Configuration conf, StripeStoreConfig config, HStore store) {
this.cellComparator = kvComparator;
this.config = config;
this.blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
if(store != null) {
storeName = store.getName();
} else {
storeName = "";
}
}

@Override
public void loadFiles(List<HStoreFile> storeFiles) {
loadUnclassifiedStoreFiles(storeFiles);
}

@Override
public String getStoreName() {
return storeName;
}

@Override
public Collection<HStoreFile> getStorefiles() {
return state.allFilesCached;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;

import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -41,6 +42,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -116,6 +118,87 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
// compact-all-things behavior.
Collection<HStoreFile> allFiles = si.getStorefiles();
if (StoreUtils.hasReferences(allFiles)) {
if (config.isPriorityCompactRefsEnabled()) {
// try to only select reference files
LOG.info("There are references in the store {}, compact reference files only. ",
si.getStoreName());
List<HStoreFile> l0References = StoreUtils.filteredReferenceFiles(si.getLevel0Files());
if (!l0References.isEmpty()) {
boolean needSelect = needSelectFiles(l0References);
if (needSelect) {
// need select L0 reference file compaction means L0 is very large.
// if L0 reference is large, then we should compact large stripes first, to make sure
// the stripes will not too large after the large L0 compaction.
StripeCompactionRequest result = selectSingleStripeCompaction(
si, false, false, false);
if (result != null) {
LOG.debug("Performing one whole stripe split compaction after split, {}",
si.getStoreName());
return result;
}
}
List<HStoreFile> toCompactL0Refs = needSelect ?
selectSimpleCompaction(l0References, false, false, true) :
l0References;
assert !toCompactL0Refs.isEmpty() : "To compact reference files should not be empty";
String msg = "";
if (LOG.isDebugEnabled()) {
msg = String.format("Compact L0 references only after split. %d store files, "
+ "%d L0 files, %d reference files length %d, to "
+ "compact %d reference files with length %d, store %s", allFiles.size(),
si.getLevel0Files().size(), l0References.size(), getTotalFileSize(l0References),
toCompactL0Refs.size(), getTotalFileSize(toCompactL0Refs), si.getStoreName());
}
StripeCompactionRequest request;
if (si.getStripeCount() > 0) {
// do L0 reference compaction, will perform boundary compaction
LOG.debug(msg + ". Performing boundary compaction.");
request =
new BoundaryStripeCompactionRequest(toCompactL0Refs, si.getStripeBoundaries());
} else {
// do L0 reference compaction, will perform split compaction
LOG.debug(msg + ". Performing split stripe compaction.");
long targetKvs =
estimateTargetKvs(toCompactL0Refs, config.getInitialCount()).getFirst();
request = new SplitStripeCompactionRequest(toCompactL0Refs, OPEN_KEY, OPEN_KEY, targetKvs);
}
request.getRequest().setAfterSplit(true);
request.getRequest().setIsMajor(false, false);
return request;
}
// select reference files in a single stripe
int priorityStripe = getStripeIndexWithReferences(si);
if (priorityStripe != -1) {
LOG.debug("The stripe {} has reference files, select all files in this stripe to "
+ "compact, store {}", priorityStripe, si.getStoreName());
Collection<HStoreFile> priorityStripeFiles = si.getStripes().get(priorityStripe);
int targetCount = 1;
long targetKvs = Long.MAX_VALUE;
long toCompactSize = getTotalFileSize(priorityStripeFiles);
if (toCompactSize >= config.getSplitSize()) {
Pair<Long, Integer> estimate =
estimateTargetKvs(priorityStripeFiles, config.getSplitCount());
targetCount = estimate.getSecond();
targetKvs = estimate.getFirst();
}
String splitString =
"; the stripe will be split into at most " + targetCount + " stripes with "
+ targetKvs + " target KVs, toCompact files size is " + toCompactSize;
StripeCompactionRequest request =
new SplitStripeCompactionRequest(priorityStripeFiles,
si.getStartRow(priorityStripe),
si.getEndRow(priorityStripe), targetCount, targetKvs);
LOG.debug(
"Priority compact stripe {} all files, selecting {} files, " + " store {}",
priorityStripe, request.getRequest().getFiles().size(),
si.getStoreName() + splitString);
request.getRequest().setAfterSplit(true);
request.getRequest().setIsMajor(false, false);
return request;
}
}

// the priority compact reference files is disabled, so compact all files after split
LOG.debug("There are references in the store; compacting all files");
long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
Expand Down Expand Up @@ -162,6 +245,35 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
}

/**
* Check if the size or count of the participants is too large to select all
* for one compaction request
* @param participants participant store files
* @return True if need to select partial of the participants, or else False
*/
private boolean needSelectFiles(final List<HStoreFile> participants) {
return participants.size() > this.config.getStripeCompactMaxFiles() ||
getTotalFileSize(participants) > comConf.getMaxCompactSize();
}

/**
* Get the index of the stripe who has reference files
* @param si the stripe information provider
* @return the index of a stripe, [0,n-1]
*/
private int getStripeIndexWithReferences(StripeInformationProvider si) {
ArrayList<ImmutableList<HStoreFile>> stripeFiles = si.getStripes();
for (int i = 0; i < stripeFiles.size(); ++i) {
ImmutableList<HStoreFile> oneStripeFiles = stripeFiles.get(i);
if (StoreUtils.hasReferences(oneStripeFiles)) {
LOG.debug("Stripe {} has references, endRow {}, store {}", i, si.getEndRow(i),
si.getStoreName());
return i;
}
}
return -1;
}

public boolean needsCompactions(StripeInformationProvider si, List<HStoreFile> filesCompacting) {
// Approximation on whether we need compaction.
return filesCompacting.isEmpty()
Expand Down Expand Up @@ -559,6 +671,12 @@ public void setMajorRangeFull() {

/** The information about stripes that the policy needs to do its stuff */
public static interface StripeInformationProvider {
/**
* The store name, can be used in log print
* @return
*/
String getStoreName();

public Collection<HStoreFile> getStorefiles();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ private static StripeStoreFileManager createManager(
StripeStoreConfig config = new StripeStoreConfig(
conf, Mockito.mock(StoreConfigInformation.class));
StripeStoreFileManager result = new StripeStoreFileManager(CellComparatorImpl.COMPARATOR, conf,
config);
config, null);
result.loadFiles(sfs);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver.compactions;

import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.PRIORITY_COMPACT_REFERENCE_FILES_ENABLED;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.mockito.ArgumentMatcher;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.ListUtils;

@RunWith(Parameterized.class)
@Category({RegionServerTests.class, MediumTests.class})
Expand Down Expand Up @@ -251,8 +254,63 @@ public void testWithReferences() throws Exception {
assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(), any());
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any());
}

@Test
public void testPriorityReferences() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, true);
StripeCompactionPolicy policy = createPolicy(conf);
StripeCompactor sc = mock(StripeCompactor.class);
HStoreFile ref = createFile();
when(ref.isReference()).thenReturn(true);
StripeInformationProvider si = mock(StripeInformationProvider.class);
List<HStoreFile> mixed = al(ref, createFile());
when(si.getStorefiles()).thenReturn(mixed);
when(si.getLevel0Files()).thenReturn(mixed);
Collection<HStoreFile> refs = al(ref);

assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertTrue(ListUtils.isEqualList(refs, scr.getRequest().getFiles()));
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
// this compaction request has no major range
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), any(), any(), any(), any());
}

@Test
public void testPrioritySelectReferences() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, true);
int compactFileCount = 2;
conf.setInt(MIN_FILES_KEY, compactFileCount);
conf.setInt(MAX_FILES_KEY,compactFileCount);
StripeCompactionPolicy policy = createPolicy(conf);
StripeCompactor sc = mock(StripeCompactor.class);
StripeInformationProvider si = mock(StripeInformationProvider.class);
List<HStoreFile> sfs = mockRefs(10, 10);
when(si.getStorefiles()).thenReturn(sfs);
when(si.getLevel0Files()).thenReturn(sfs);

assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(compactFileCount, scr.getRequest().getFiles().size());
}

private List<HStoreFile> mockRefs(int refCount, int otherCount) throws Exception {
List<HStoreFile> files = new ArrayList<>(refCount + otherCount);
for(int i = 0;i<refCount;i++) {
HStoreFile ref = createFile();
when(ref.isReference()).thenReturn(true);
files.add(ref);
}
for(int i = 0;i<otherCount;i++) {
HStoreFile ref = createFile();
files.add(ref);
}
return files;
}

@Test
Expand Down