Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -31,7 +30,9 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -49,10 +50,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Function;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Collections2;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
* Utility class to handle the removal of HFiles (or the respective {@link HStoreFile StoreFiles})
Expand Down Expand Up @@ -158,15 +157,15 @@ public boolean accept(Path file) {
}

// convert the files in the region to a File
toArchive.addAll(Lists.transform(Arrays.asList(storeDirs), getAsFile));
Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
LOG.debug("Archiving " + toArchive);
List<File> failedArchive = resolveAndArchive(fs, regionArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException("Failed to archive/delete all the files for region:"
+ regionDir.getName() + " into " + regionArchiveDir
+ ". Something is probably awry on the filesystem.",
Collections2.transform(failedArchive, FUNC_FILE_TO_PATH));
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:" + regionDir.getName() + " into " +
regionArchiveDir + ". Something is probably awry on the filesystem.",
failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));
}
// if that was successful, then we delete the region
return deleteRegionWithoutArchiving(fs, regionDir);
Expand Down Expand Up @@ -269,7 +268,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf,
}

FileStatusConverter getAsFile = new FileStatusConverter(fs);
Collection<File> toArchive = Lists.transform(Arrays.asList(storeFiles), getAsFile);
Collection<File> toArchive = Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList());
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);

// do the actual archive
Expand All @@ -279,7 +278,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf,
throw new FailedArchiveException("Failed to archive/delete all the files for region:"
+ Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family)
+ " into " + storeArchiveDir + ". Something is probably awry on the filesystem.",
Collections2.transform(failedArchive, FUNC_FILE_TO_PATH));
failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));
}
}

Expand Down Expand Up @@ -328,17 +327,18 @@ public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionIn

// Wrap the storefile into a File
StoreToFile getStorePath = new StoreToFile(fs);
Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);
Collection<File> storeFiles =
compactedFiles.stream().map(getStorePath).collect(Collectors.toList());

// do the actual archive
List<File> failedArchive = resolveAndArchive(fs, storeArchiveDir, storeFiles,
EnvironmentEdgeManager.currentTime());
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime());

if (!failedArchive.isEmpty()){
throw new FailedArchiveException("Failed to archive/delete all the files for region:"
+ Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)
+ " into " + storeArchiveDir + ". Something is probably awry on the filesystem.",
Collections2.transform(failedArchive, FUNC_FILE_TO_PATH));
failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));
}
}

Expand Down Expand Up @@ -698,8 +698,10 @@ public String getName() {

@Override
public Collection<File> getChildren() throws IOException {
if (fs.isFile(file)) return Collections.emptyList();
return Collections2.transform(Arrays.asList(fs.listStatus(file)), getAsFile);
if (fs.isFile(file)) {
return Collections.emptyList();
}
return Stream.of(fs.listStatus(file)).map(getAsFile).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import java.util.stream.Collectors;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
Expand Down Expand Up @@ -101,12 +99,9 @@ public SpaceQuotaSnapshot getTargetState(
public Iterable<Entry<RegionInfo, Long>> filterBySubject(String namespace) {
rlock.lock();
try {
return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<RegionInfo,Long>>() {
@Override
public boolean apply(Entry<RegionInfo,Long> input) {
return namespace.equals(input.getKey().getTable().getNamespaceAsString());
}
});
return regionUsage.entrySet().stream()
.filter(entry -> namespace.equals(entry.getKey().getTable().getNamespaceAsString()))
.collect(Collectors.toList());
} finally {
rlock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -37,9 +37,9 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;

import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
Expand Down Expand Up @@ -144,15 +144,11 @@ long getSnapshotSizesForTable(TableName tn) throws IOException {
}

@Override
public Iterable<Entry<RegionInfo,Long>> filterBySubject(TableName table) {
public Iterable<Entry<RegionInfo, Long>> filterBySubject(TableName table) {
rlock.lock();
try {
return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<RegionInfo,Long>>() {
@Override
public boolean apply(Entry<RegionInfo,Long> input) {
return table.equals(input.getKey().getTable());
}
});
return regionUsage.entrySet().stream()
.filter(entry -> table.equals(entry.getKey().getTable())).collect(Collectors.toList());
} finally {
rlock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hbase.thirdparty.com.google.common.base.Function;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;

import java.util.Comparator;

import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -37,32 +34,23 @@ final class StoreFileComparators {
* ordering, then bulkLoadTime. If there are ties, the path name is used as a tie-breaker.
*/
public static final Comparator<HStoreFile> SEQ_ID =
Ordering.compound(ImmutableList.of(Ordering.natural().onResultOf(new GetSeqId()),
Ordering.natural().onResultOf(new GetFileSize()).reverse(),
Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetPathName())));
Comparator.comparingLong(HStoreFile::getMaxSequenceId)
.thenComparing(Comparator.comparingLong(new GetFileSize()).reversed())
.thenComparingLong(new GetBulkTime()).thenComparing(new GetPathName());

/**
* Comparator for time-aware compaction. SeqId is still the first ordering criterion to maintain
* MVCC.
*/
public static final Comparator<HStoreFile> SEQ_ID_MAX_TIMESTAMP =
Ordering.compound(ImmutableList.of(Ordering.natural().onResultOf(new GetSeqId()),
Ordering.natural().onResultOf(new GetMaxTimestamp()),
Ordering.natural().onResultOf(new GetFileSize()).reverse(),
Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetPathName())));
Comparator.comparingLong(HStoreFile::getMaxSequenceId).thenComparingLong(new GetMaxTimestamp())
.thenComparing(Comparator.comparingLong(new GetFileSize()).reversed())
.thenComparingLong(new GetBulkTime()).thenComparing(new GetPathName());

private static class GetSeqId implements Function<HStoreFile, Long> {
@Override
public Long apply(HStoreFile sf) {
return sf.getMaxSequenceId();
}
}
private static class GetFileSize implements ToLongFunction<HStoreFile> {

private static class GetFileSize implements Function<HStoreFile, Long> {
@Override
public Long apply(HStoreFile sf) {
public long applyAsLong(HStoreFile sf) {
if (sf.getReader() != null) {
return sf.getReader().length();
} else {
Expand All @@ -73,23 +61,26 @@ public Long apply(HStoreFile sf) {
}
}

private static class GetBulkTime implements Function<HStoreFile, Long> {
private static class GetBulkTime implements ToLongFunction<HStoreFile> {

@Override
public Long apply(HStoreFile sf) {
public long applyAsLong(HStoreFile sf) {
return sf.getBulkLoadTimestamp().orElse(Long.MAX_VALUE);
}
}

private static class GetPathName implements Function<HStoreFile, String> {

@Override
public String apply(HStoreFile sf) {
return sf.getPath().getName();
}
}

private static class GetMaxTimestamp implements Function<HStoreFile, Long> {
private static class GetMaxTimestamp implements ToLongFunction<HStoreFile> {

@Override
public Long apply(HStoreFile sf) {
public long applyAsLong(HStoreFile sf) {
return sf.getMaximumTimestamp().orElse(Long.MAX_VALUE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile>
candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
throws IOException {
if (!tryingMajor) {
candidateSelection = filterBulk(candidateSelection);
filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
candidateSelection = checkMinFilesCriteria(candidateSelection,
comConf.getMinFilesToCompact());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
import java.util.List;
import java.util.OptionalInt;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.Collections2;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -199,16 +197,9 @@ protected ArrayList<HStoreFile> skipLargeFiles(ArrayList<HStoreFile> candidates,

/**
* @param candidates pre-filtrate
* @return filtered subset exclude all bulk load files if configured
*/
protected ArrayList<HStoreFile> filterBulk(ArrayList<HStoreFile> candidates) {
candidates.removeAll(Collections2.filter(candidates, new Predicate<HStoreFile>() {
@Override
public boolean apply(HStoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
return candidates;
protected void filterBulk(ArrayList<HStoreFile> candidates) {
candidates.removeIf(HStoreFile::excludeFromMinorCompaction);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
// just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in
// the guava Predicate.
if (file == null) {
return false;
}
String hfile = file.getPath().getName();
boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
// just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in
// the guava Predicate.
if (file == null) {
return false;
}
String wal = file.getPath().getName();
boolean logInReplicationQueue = wals.contains(wal);
if (logInReplicationQueue) {
Expand Down