Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* Common functionality needed by all versions of {@link HFile} writers.
*/
Expand Down Expand Up @@ -774,6 +776,11 @@ public void beforeShipped() throws IOException {
}
}

@VisibleForTesting
public Cell getLastCell() {
return lastCell;
}

protected void finishFileInfo() throws IOException {
if (lastCell != null) {
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
LOG.debug("Compaction progress: "
+ compactionName
+ " "
+ progress
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ throughputController);
String rate = String.format("%.2f",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (LOG.isDebugEnabled())? Then can skip to generate "rate" String in production environment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have an existed isDebugEnabled before in line#312 :-)

(bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
compactionName, progress, rate, throughputController);
lastMillis = now;
bytesWrittenProgressForLog = 0;
}
Expand All @@ -330,6 +327,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
throw new InterruptedIOException(
"Interrupted while control throughput of compacting " + compactionName);
} finally {
// Clone last cell in the final because writer will append last cell when committing. If
// don't clone here and once the scanner get closed, then the memory of last cell will be
// released. (HBASE-22582)
((ShipperListener) writer).beforeShipped();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below in Compactor, call beforeShipped() before finishing the throughputController. Its ok for any order. Still we can maintain one order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I will provide a UT to address this bug but am working on other things now... will update this patch later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment here, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

throughputController.finish(compactionName);
if (!finished && mobFileWriter != null) {
abortWriter(mobFileWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,9 +1003,8 @@ private void resetQueryMatcher(Cell lastTopKey) {
protected void checkScanOrder(Cell prevKV, Cell kv,
CellComparator comparator) throws IOException {
// Check that the heap gives us KVs in an increasing order.
assert prevKV == null || comparator == null
|| comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
+ " followed by a " + "smaller key " + kv + " in cf " + store;
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key "
+ prevKV + " followed by a smaller key " + kv + " in cf " + store;
}

protected boolean seekToNextRow(Cell c) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long now = 0;
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();

throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
try {
do {
hasMore = scanner.next(cells, scannerContext);
Expand Down Expand Up @@ -428,7 +429,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
}
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener)writer).beforeShipped();
((ShipperListener) writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by
Expand All @@ -447,13 +448,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
LOG.debug("Compaction progress: "
+ compactionName
+ " "
+ progress
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ throughputController);
String rate = String.format("%.2f",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. if (LOG.isDebugEnabled())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see line#449.

(bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
compactionName, progress, rate, throughputController);
lastMillis = now;
bytesWrittenProgressForLog = 0;
}
Expand All @@ -462,9 +460,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
} while (hasMore);
} catch (InterruptedException e) {
progress.cancel();
throw new InterruptedIOException("Interrupted while control throughput of compacting "
+ compactionName);
throw new InterruptedIOException(
"Interrupted while control throughput of compacting " + compactionName);
} finally {
// Clone last cell in the final because writer will append last cell when committing. If
// don't clone here and once the scanner get closed, then the memory of last cell will be
// released. (HBASE-22582)
((ShipperListener) writer).beforeShipped();
throughputController.finish(compactionName);
}
progress.complete();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestCompactorMemLeak {

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final Configuration CONF = UTIL.getConfiguration();
private static final AtomicBoolean IS_LAST_CELL_ON_HEAP = new AtomicBoolean(false);
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final byte[] VALUE = Bytes.toBytes("value");

@Rule
public TestName name = new TestName();

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactorMemLeak.class);

@BeforeClass
public static void setUp() throws Exception {
IS_LAST_CELL_ON_HEAP.set(false);
// Must use the ByteBuffAllocator here
CONF.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
// Must use OFF-HEAP BucketCache here.
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.1f);
CONF.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
// 32MB for BucketCache.
CONF.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 32);
// Use the MyCompactor
CONF.set(DEFAULT_COMPACTOR_CLASS_KEY, MyCompactor.class.getName());
UTIL.startMiniCluster();
}

@AfterClass
public static void tearDown() throws Exception {
IS_LAST_CELL_ON_HEAP.set(false);
UTIL.shutdownMiniCluster();
}

private void assertMajorCompactionOK(TableName tableName) {
List<HRegion> regions = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getRegions(tableName);
Assert.assertEquals(regions.size(), 1);
HRegion region = regions.get(0);
Assert.assertEquals(region.getStores().size(), 1);
HStore store = region.getStore(FAMILY);
Assert.assertEquals(store.getStorefilesCount(), 1);
}

@Test
public void testMemLeak() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName());
Table table = UTIL.createTable(tableName, FAMILY);

// Put and Flush #1
Put put = new Put(Bytes.toBytes("row1")).addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put);
UTIL.getAdmin().flush(tableName);

// Put and Flush #2
put = new Put(Bytes.toBytes("row2")).addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put);
UTIL.getAdmin().flush(tableName);

// Major compact
UTIL.getAdmin().majorCompact(tableName);
Thread.sleep(6000);
assertMajorCompactionOK(tableName);

// The last cell before Compactor#commitWriter must be an heap one.
Assert.assertTrue(IS_LAST_CELL_ON_HEAP.get());
}

public static class MyCompactor extends DefaultCompactor {

public MyCompactor(Configuration conf, HStore store) {
super(conf, store);
}

@Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
Cell cell = writerImpl.getLastCell();
// The cell should be backend with an KeyOnlyKeyValue.
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
return super.commitWriter(writer, fd, request);
}
}
}