Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -242,6 +244,16 @@ public int compare(BlockCacheKey a, BlockCacheKey b) {
/** In-memory bucket size */
private float memoryFactor;

private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check
* persistent file integrity, default algorithm is MD5
* */
private String algorithm;

public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
Expand All @@ -252,8 +264,9 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
throws IOException {
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
Expand Down Expand Up @@ -295,7 +308,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
throw new RuntimeException(cnfe);
}
}
Expand Down Expand Up @@ -1021,41 +1034,69 @@ static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry>

private void persistToFile() throws IOException {
assert !cacheEnabled;
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
try (ObjectOutputStream oos = new ObjectOutputStream(
new FileOutputStream(persistencePath, false))){
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
fos = new FileOutputStream(persistencePath, false);
oos = new ObjectOutputStream(fos);
byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(algorithm);
if (checksum != null) {
oos.write(ProtobufUtil.PB_MAGIC);
oos.writeInt(checksum.length);
oos.write(checksum);
}
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
} finally {
if (oos != null) oos.close();
if (fos != null) fos.close();
}
}

@SuppressWarnings("unchecked")
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
private void retrieveFromFile(int[] bucketSizes) throws IOException,
ClassNotFoundException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
FileInputStream fis = null;
ObjectInputStream ois = null;
try {
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
fis = new FileInputStream(persistencePath);
ois = new ObjectInputStream(fis);
ois = new ObjectInputStream(new FileInputStream(persistencePath));
int pblen = ProtobufUtil.lengthOfPBMagic();
byte[] pbuf = new byte[pblen];
int read = ois.read(pbuf);
if (read != pblen) {
LOG.warn("Can't restore from file because of incorrect number of bytes read while " +
"checking for protobuf magic number. Requested=" + pblen + ", but received= " +
read + ".");
return;
}
if (Bytes.equals(ProtobufUtil.PB_MAGIC, pbuf)) {
int length = ois.readInt();
byte[] persistentChecksum = new byte[length];
int readLen = ois.read(persistentChecksum);
if (readLen != length) {
LOG.warn("Can't restore from file because of incorrect number of bytes read while " +
"checking for persistent checksum. Requested=" + length + ", but received=" +
readLen + ". ");
return;
}
if (!((PersistentIOEngine) ioEngine).verifyFileIntegrity(
persistentChecksum, algorithm)) {
LOG.warn("Can't restore from file because of verification failed.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate the if branch? readLen != length doesn't mean Can't restore from file because of verification failed

return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add an INFO log here? Or we have the log saying the verify fail in FileIOE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have log if verify fail in FileIOE.

}
} else {
// persistent file may be an old version of file, it's not support verification,
// so reopen ObjectInputStream and read the persistent file from head
ois.close();
ois = new ObjectInputStream(new FileInputStream(persistencePath));
}
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
Expand All @@ -1079,8 +1120,9 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAlloc
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
} finally {
if (ois != null) ois.close();
if (fis != null) fis.close();
if (ois != null) {
ois.close();
}
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
Expand Down Expand Up @@ -1597,4 +1639,9 @@ float getMultiFactor() {
float getMemoryFactor() {
return memoryFactor;
}

@VisibleForTesting
public UniqueIndexMap<Integer> getDeserialiserMap() {
return deserialiserMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

/**
* IO engine that stores data to a file on the local file system.
*/
@InterfaceAudience.Private
public class FileIOEngine implements IOEngine {
public class FileIOEngine implements PersistentIOEngine {
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
public static final String FILE_DELIMITER = ",";
private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""});

private final String[] filePaths;
private final FileChannel[] fileChannels;
private final RandomAccessFile[] rafs;
Expand Down Expand Up @@ -68,15 +74,20 @@ public FileIOEngine(long capacity, String... filePaths) throws IOException {
// The next setting length will throw exception,logging this message
// is just used for the detail reason of exception,
String msg = "Only " + StringUtils.byteDesc(totalSpace)
+ " total space under " + filePath + ", not enough for requested "
+ StringUtils.byteDesc(sizePerFile);
+ " total space under " + filePath + ", not enough for requested "
+ StringUtils.byteDesc(sizePerFile);
LOG.warn(msg);
}
rafs[i].setLength(sizePerFile);
File file = new File(filePath);
// setLength() method will change file's last modified time. So if don't do
// this check, wrong time will be used when calculating checksum.
if (file.length() != sizePerFile) {
rafs[i].setLength(sizePerFile);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setLength() method will change file's last modified time. So if don't change this, the wrong time will be used to calculate checksum.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment block // to clarify the purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya some fat comments here would be nice. Got why u have this check now.. I can think of a case though. Say we have a file based cache with one file and size was 10 GB. Now the restart of the RS happening. The cache is persisted also. Before restart the size is been increased to 20 GB. There is no truncate and ideally the cache get rebuilt. Only thing is after the restart the cache capacity is increased. But now as per the code, the length is changed here and so the last modified time and which will fail the verify phase. Is it some thing to be considered? Dont want much complex handling for this. Might not be a common case for persisted cache. Max what happening is we not able to retrieve persisted cache. But welcoming thinking/suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand you correctly, you mean change the bucket cache size before restart RS. Actually, in retrieveFromFile() method will check the bucket cache size:
if (capacitySize != cacheCapacity) throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) + ", expected: " + StringUtils.byteDesc(cacheCapacity));
So change the bucket cache size will no longer retrieve from file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yaya that was my Q.. Oh great.. I did not read whole the code.. Then its perfect. Pls add enough comments around to know why this check and set length is so imp. Great.

fileChannels[i] = rafs[i].getChannel();
channelLocks[i] = new ReentrantLock();
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+ ", on the path:" + filePath);
+ ", on the path: " + filePath);
} catch (IOException fex) {
LOG.error("Failed allocating cache on " + filePath, fex);
shutdown();
Expand All @@ -85,6 +96,18 @@ public FileIOEngine(long capacity, String... filePaths) throws IOException {
}
}

@Override
public boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm) {
byte[] calculateChecksum = calculateChecksum(algorithm);
if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
LOG.error("Mismatch of checksum! The persistent checksum is " +
Bytes.toString(persistentChecksum) + ", but the calculate checksum is " +
Bytes.toString(calculateChecksum));
return false;
}
return true;
}

@Override
public String toString() {
return "ioengine=" + this.getClass().getSimpleName() + ", paths="
Expand Down Expand Up @@ -267,6 +290,61 @@ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOExceptio
}
}

@Override
public byte[] calculateChecksum(String algorithm) {
if (filePaths == null) {
return null;
}
try {
StringBuilder sb = new StringBuilder();
for (String filePath : filePaths){
File file = new File(filePath);
sb.append(filePath);
sb.append(getFileSize(filePath));
sb.append(file.lastModified());
}
MessageDigest messageDigest = MessageDigest.getInstance(algorithm);
messageDigest.update(Bytes.toBytes(sb.toString()));
return messageDigest.digest();
} catch (IOException ioex) {
LOG.error("Calculating checksum failed.", ioex);
return null;
} catch (NoSuchAlgorithmException e) {
LOG.error("No such algorithm : " + algorithm + "!");
return null;
}
}

/**
* Using Linux command du to get file's real size
* @param filePath the file
* @return file's real size
* @throws IOException something happened like file not exists
*/
private static long getFileSize(String filePath) throws IOException {
DU.setExecCommand(filePath);
DU.execute();
return Long.parseLong(DU.getOutput().split("\t")[0]);
}

private static class DuFileCommand extends Shell.ShellCommandExecutor {
private String[] execCommand;

DuFileCommand(String[] execString) {
super(execString);
execCommand = execString;
}

void setExecCommand(String filePath) {
this.execCommand[1] = filePath;
}

@Override
public String[] getExecString() {
return this.execCommand;
}
}

private static interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

/**
* A class implementing PersistentIOEngine interface supports persistent and file integrity verify
* for {@link BucketCache}
*/
@InterfaceAudience.Private
public interface PersistentIOEngine extends IOEngine {

/**
* Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5
* @param algorithm which algorithm to calculate checksum
* @return the checksum which is convert to HexString
*/
byte[] calculateChecksum(String algorithm);

/**
* Verify cache files's integrity
* @param persistentChecksum the persistent checksum
* @param algorithm which algorithm to calculate checksum
* @return true if verify successfully
*/
boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm);
}
Loading