Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
Expand Down Expand Up @@ -84,17 +83,21 @@ public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> s
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags.
// make this writer with tags always because of possible new cells
// with tags.
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
shouldDropBehind);
shouldDropBehind);
}
};

public DefaultMobStoreCompactor(Configuration conf, HStore store) {
super(conf, store);
// The mob cells reside in the mob-enabled column family which is held by HMobStore.
// During the compaction, the compactor reads the cells from the mob files and
// probably creates new mob files. All of these operations are included in HMobStore,
// The mob cells reside in the mob-enabled column family which is held by
// HMobStore.
// During the compaction, the compactor reads the cells from the mob
// files and
// probably creates new mob files. All of these operations are included
// in HMobStore,
// so we need to cast the Store to HMobStore.
if (!(store instanceof HMobStore)) {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
Expand All @@ -104,8 +107,8 @@ public DefaultMobStoreCompactor(Configuration conf, HStore store) {
}

@Override
public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
User user) throws IOException {
public List<Path> compact(CompactionRequestImpl request,
ThroughputController throughputController, User user) throws IOException {
return compact(request, scannerFactory, writerFactory, throughputController, user);
}

Expand All @@ -114,22 +117,27 @@ public List<Path> compact(CompactionRequestImpl request, ThroughputController th
* This is for when the mob threshold size has changed or if the mob
* column family mode has been toggled via an alter table statement.
* Compacts the files by the following rules.
* 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
* 1. If the Put cell has a mob reference tag, the cell's value is the path
* of the mob file.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
* If the value size of a cell is larger than the threshold, this cell is
* regarded as a mob,
* directly copy the (with mob tag) cell into the new store file.
* </li>
* <li>
* Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
* Otherwise, retrieve the mob cell from the mob file, and writes a copy of
* the cell into
* the new store file.
* </li>
* </ol>
* 2. If the Put cell doesn't have a reference tag.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
* write this cell to a mob file, and write the path of this mob file to the store file.
* If the value size of a cell is larger than the threshold, this cell is
* regarded as a mob,
* write this cell to a mob file, and write the path of this mob file to
* the store file.
* </li>
* <li>
* Otherwise, directly write this cell into the store file.
Expand All @@ -138,29 +146,38 @@ public List<Path> compact(CompactionRequestImpl request, ThroughputController th
* 3. Decide how to write a Delete cell.
* <ol>
* <li>
* If a Delete cell does not have a mob reference tag which means this delete marker have not
* been written to the mob del file, write this cell to the mob del file, and write this cell
* If a Delete cell does not have a mob reference tag which means this
* delete marker have not
* been written to the mob del file, write this cell to the mob del file,
* and write this cell
* with a ref tag to a store file.
* </li>
* <li>
* Otherwise, directly write it to a store file.
* </li>
* </ol>
* After the major compaction on the normal hfiles, we have a guarantee that we have purged all
* deleted or old version mob refs, and the delete markers are written to a del file with the
* suffix _del. Because of this, it is safe to use the del file in the mob compaction.
* The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
* mob files. When the small mob files are merged into bigger ones, the del file is added into
* After the major compaction on the normal hfiles, we have a guarantee
* that we have purged all
* deleted or old version mob refs, and the delete markers are written to a
* del file with the
* suffix _del. Because of this, it is safe to use the del file in the mob
* compaction.
* The mob compaction doesn't take place in the normal hfiles, it occurs
* directly in the
* mob files. When the small mob files are merged into bigger ones, the del
* file is added into
* the scanner to filter the deleted cells.
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which
* is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @return Whether compaction ended; false if it was interrupted for any reason.
* @return Whether compaction ended; false if it was interrupted for any
* reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
Expand Down Expand Up @@ -191,27 +208,31 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
try {
try {
// If the mob file writer could not be created, directly write the cell to the store file.
mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
compactionCompression, store.getRegionInfo().getStartKey(), true);
// If the mob file writer could not be created, directly write the
// cell to the store file.
mobFileWriter = mobStore
.createWriterInTmp(fd.latestPutTs, fd.maxKeyCount, compactionCompression,
store.getRegionInfo().getStartKey(), true);
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
} catch (IOException e) {
LOG.warn("Failed to create mob writer, "
+ "we will continue the compaction by writing MOB cells directly in store files", e);
+ "we will continue the compaction by writing MOB cells " + "directly in store "
+ "files", e);
}
if (major) {
try {
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey());
delFileWriter = mobStore
.createDelFileWriterInTmp(fd.latestPutTs, fd.maxKeyCount, compactionCompression,
store.getRegionInfo().getStartKey());
} catch (IOException e) {
LOG.warn(
"Failed to create del writer, "
+ "we will continue the compaction by writing delete markers directly in store files",
e);
"Failed to create del writer, " + "we will continue the compaction by writing delete "
+ "markers directly in store files", e);
}
}
do {
Expand All @@ -232,19 +253,24 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
deleteMarkersCount++;
}
} else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
// If the mob file writer is null or the kv type is not put, directly write the cell
// If the mob file writer is null or the kv type is not put,
// directly write the cell
// to the store file.
writer.append(c);
} else if (MobUtils.isMobReferenceCell(c)) {
if (MobUtils.hasValidMobRefCellValue(c)) {
int size = MobUtils.getMobValueLength(c);
if (size > mobSizeThreshold) {
// If the value size is larger than the threshold, it's regarded as a mob. Since
// its value is already in the mob file, directly write this cell to the store file
// If the value size is larger than the threshold, it's
// regarded as a mob. Since
// its value is already in the mob file, directly write this
// cell to the store file
writer.append(c);
} else {
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
// the mob cell from the mob file, and write it back to the store file.
// If the value is not larger than the threshold, it's not
// regarded a mob. Retrieve
// the mob cell from the mob file, and write it back to the
// store file.
Cell mobCell = mobStore.resolve(c, false);
if (mobCell.getValueLength() != 0) {
// put the mob data back to the store file
Expand All @@ -253,29 +279,35 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
cellsCountCompactedFromMob++;
cellsSizeCompactedFromMob += mobCell.getValueLength();
} else {
// If the value of a file is empty, there might be issues when retrieving,
// directly write the cell to the store file, and leave it to be handled by the
// If the value of a file is empty, there might be issues
// when retrieving,
// directly write the cell to the store file, and leave it
// to be handled by the
// next compaction.
writer.append(c);
}
}
} else {
LOG.warn("The value format of the KeyValue " + c
+ " is wrong, its length is less than " + Bytes.SIZEOF_INT);
LOG.warn(
"The value format of the KeyValue " + c + " is wrong, its length is less than "
+ Bytes.SIZEOF_INT);
writer.append(c);
}
} else if (c.getValueLength() <= mobSizeThreshold) {
//If value size of a cell is not larger than the threshold, directly write to store file
//If value size of a cell is not larger than the threshold,
// directly write to store file
writer.append(c);
} else {
// If the value size of a cell is larger than the threshold, it's regarded as a mob,
// write this cell to a mob file, and write the path to the store file.
// If the value size of a cell is larger than the threshold, it's
// regarded as a mob,
// write this cell to a mob file, and write the path to the store
// file.
mobCells++;
// append the original keyValue in the mob file.
mobFileWriter.append(c);
Cell reference = MobUtils.createMobRefCell(c, fileName,
this.mobStore.getRefCellTags());
// write the cell whose value is the path of a mob file to the store file.
Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
// write the cell whose value is the path of a mob file to the
// store file.
writer.append(reference);
cellsCountCompactedToMob++;
cellsSizeCompactedToMob += c.getValueLength();
Expand All @@ -300,7 +332,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
}
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener)writer).beforeShipped();
((ShipperListener) writer).beforeShipped();
kvs.shipped();
bytesWrittenProgressForShippedCall = 0;
}
Expand All @@ -309,13 +341,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
LOG.debug("Compaction progress: "
+ compactionName
+ " "
+ progress
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ throughputController);
LOG.debug("Compaction progress: " + compactionName + " " + progress + String
.format(", rate=%.2f kB/sec",
(bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0))
+ ", throughputController is " + throughputController);
lastMillis = now;
bytesWrittenProgressForLog = 0;
}
Expand Down
Loading