Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -242,6 +244,17 @@ public int compare(BlockCacheKey a, BlockCacheKey b) {
/** In-memory bucket size */
private float memoryFactor;

private String ioEngineName;
private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check
* persistent file integrity, default algorithm is MD5
* */
private String algorithm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc for this new member, what is this algorithm for?


public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
Expand All @@ -252,8 +265,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
throws IOException {
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
Expand All @@ -275,6 +287,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
", memoryFactor: " + memoryFactor);

this.cacheCapacity = capacity;
this.ioEngineName = ioEngineName;
this.persistencePath = persistencePath;
this.blockSize = blockSize;
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
Expand All @@ -288,14 +301,15 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();

this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);

this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
ioEngine = getIOEngineFromName();
if (ioEngine.isPersistent() && persistencePath != null) {
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
throw new RuntimeException(cnfe);
}
}
Expand Down Expand Up @@ -359,24 +373,22 @@ public String getIoEngine() {

/**
* Get the IOEngine from the IO engine name
* @param ioEngineName
* @param capacity
* @return the IOEngine
* @throws IOException
*/
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
private IOEngine getIOEngineFromName()
throws IOException {
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
// In order to make the usage simple, we only need the prefix 'files:' in
// document whether one or multiple file(s), but also support 'file:' for
// the compatibility
String[] filePaths =
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
return new FileIOEngine(capacity, filePaths);
return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths);
} else if (ioEngineName.startsWith("offheap"))
return new ByteBufferIOEngine(capacity, true);
return new ByteBufferIOEngine(cacheCapacity, true);
else if (ioEngineName.startsWith("heap"))
return new ByteBufferIOEngine(capacity, false);
return new ByteBufferIOEngine(cacheCapacity, false);
else
throw new IllegalArgumentException(
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
Expand Down Expand Up @@ -1021,41 +1033,48 @@ static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry>

private void persistToFile() throws IOException {
assert !cacheEnabled;
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
try (ObjectOutputStream oos = new ObjectOutputStream(
new FileOutputStream(persistencePath, false))){
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
fos = new FileOutputStream(persistencePath, false);
oos = new ObjectOutputStream(fos);
if (ioEngine instanceof PersistentIOEngine) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are in this persistToFile() , it means it is PersistentIOEngine. May be an assert and direct casting is better way than if check.

oos.write(ProtobufUtil.PB_MAGIC);
byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum();
oos.writeInt(checksum.length);
oos.write(checksum);
}
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
} finally {
if (oos != null) oos.close();
if (fos != null) fos.close();
} catch (NoSuchAlgorithmException e) {
LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e);
}
}

@SuppressWarnings("unchecked")
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
private void retrieveFromFile(int[] bucketSizes) throws IOException,
ClassNotFoundException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
FileInputStream fis = null;
ObjectInputStream ois = null;
try {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
fis = new FileInputStream(persistencePath);
ois = new ObjectInputStream(fis);
// for backward compatibility
if (ioEngine instanceof PersistentIOEngine &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment. See above
if (!ioEngine.isPersistent()) throw new IOException().
Just after that line itself you can do the typecast.

!((PersistentIOEngine) ioEngine).isOldVersion()) {
byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length];
ois.read(PBMagic);
int length = ois.readInt();
byte[] persistenceChecksum = new byte[length];
ois.read(persistenceChecksum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we are reading persistentChecksum twice in this flow. At FileIOE create time as part of verify call and here too. Here we are doing as a skip way. So why can't we do it here only? We have verifyFileIntegrity() in PersistentIOEngine interface and we can call that from here? It looks bit odd. The oldVersion check can be done here also based on he PBMagic matching.
isOldVersion() API itself not needed in the FileIOE. We process the persisted meta info here and based on that recreate the backingMap etc here in BC. So knowing whether checksum also persisted and if so verify that all can be here. I mean the actual verify imp can be in FileIOE but the call to that should be from here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about your thoughts. If we want to read it once, we should pass ObjectInputStream object "ois" to the verifyFileIntegrity() method. If it's an old version persistent file, the ois object should be reset, but reset() method is not support. We can recreate an ObjectInputStream without using try-with-resource statement, but this may be a bit unsightly......

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we need to close and reopen for the read. My biggest worry is where we do this verify. See my below comment. Now we are doing it while creation of FileIOE. If the verify fails, we are not allowing the FileIOE to be created and do its future work. My point is this. We should create the FileIOE. And then the Bucket Cache is trying to retrieve the already cached data from persistent store and for that its recreating the cache meta. At that step we should be doing the verification right. First see whether the checksum for verify is present already and if so verify. If verify ok and then try to recreate the cache meta data. Or else just forget abt that existing persisted cache data and may be do the necessary cleanup. All these work of Bucket Cache. It can ask the FileIOE to do actual verify. But should be initiated by the BucketCache. You get my mind clearly now? Sorry for not saying in detail at 1st step itself.
Ya may be we need close the file and reopen in case of old style with no pb magic. Or else consider it as 4 bytes and read next 4 bytes and make out the 8 bytes long number. But that may be having challenges wrt the platform. I dont think it is an issue to just reopen the file if no pbmagic. Comments. @Reidddddd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point already. In fact, if the verification fails, FileIOE can still be created. If verification fails, we would throw IOException, then cache the IOException and do some cleanup, but the creation of FileIOE will continue. Below is the code for the cache:
catch (IOException ioex) { LOG.error("File verification failed because of ", ioex); // delete cache files and backingMap persistent file. deleteCacheDataFile(); new File(persistentPath).delete(); }
However, I totally agree with what you said, I will modify it immediately.

}
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
Expand All @@ -1078,9 +1097,8 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAlloc
bucketAllocator = allocator;
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
blockNumber.set(backingMap.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Is it related to this jira directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When retrieve successfully from file, the "Block Count" in WebUI would be 0 if blockNumber is not changed. But it's have blocks actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So this is an existing bug. Its a one liner change. Still can be done as another bug jira may be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

} finally {
if (ois != null) ois.close();
if (fis != null) fis.close();
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
Expand Down Expand Up @@ -1597,4 +1615,9 @@ float getMultiFactor() {
float getMemoryFactor() {
return memoryFactor;
}

@VisibleForTesting
public UniqueIndexMap<Integer> getDeserialiserMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

return deserialiserMap;
}
}
Loading