-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18028. improve S3 read speed using prefetching & caching #3736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
c899bf6
6f2ed5e
c496863
206333a
1b07890
b7c9dc4
a7308a5
3ad6764
da5b712
29a482c
3ce0f6b
ea7e19c
6d9b189
dc239ab
03f068e
6cfbc6b
2809d40
0c361c5
4392e97
bdd40fd
8760671
7f80c5f
7cc1453
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.common; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| /** | ||
| * Provides functionality necessary for caching blocks of data read from FileSystem. | ||
| */ | ||
| public interface BlockCache extends Closeable { | ||
|
|
||
| /** | ||
| * Indicates whether the given block is in this cache. | ||
| * | ||
| * @param blockNumber the id of the given block. | ||
| * @return true if the given block is in this cache, false otherwise. | ||
| */ | ||
| boolean containsBlock(Integer blockNumber); | ||
steveloughran marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Gets the blocks in this cache. | ||
| * | ||
| * @return the blocks in this cache. | ||
| */ | ||
| Iterable<Integer> blocks(); | ||
|
|
||
| /** | ||
| * Gets the number of blocks in this cache. | ||
| * | ||
| * @return the number of blocks in this cache. | ||
| */ | ||
| int size(); | ||
|
|
||
| /** | ||
| * Gets the block having the given {@code blockNumber}. | ||
| * | ||
| * @param blockNumber the id of the desired block. | ||
| * @param buffer contents of the desired block are copied to this buffer. | ||
| * @throws IOException if there is an error reading the given block. | ||
| */ | ||
| void get(Integer blockNumber, ByteBuffer buffer) throws IOException; | ||
|
|
||
| /** | ||
| * Puts the given block in this cache. | ||
| * | ||
| * @param blockNumber the id of the given block. | ||
| * @param buffer contents of the given block to be added to this cache. | ||
| * @throws IOException if there is an error writing the given block. | ||
| */ | ||
| void put(Integer blockNumber, ByteBuffer buffer) throws IOException; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.common; | ||
|
|
||
| /** | ||
| * Holds information about blocks of data in an S3 file. | ||
steveloughran marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| */ | ||
| public class BlockData { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest to add more detailed description of the values, for example, NOT_READY, what's the meaning of ready? the meaning of queued, what's the diff between ready and cached?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added more details |
||
| // State of each block of data. | ||
| enum State { | ||
| // Data is not yet ready to be read. | ||
| NOT_READY, | ||
|
|
||
| // A read of this block has been queued. | ||
| QUEUED, | ||
|
|
||
| // This block is ready to be read. | ||
| READY, | ||
|
|
||
| // This block has been cached. | ||
| CACHED | ||
| } | ||
|
|
||
| // State of all blocks in an S3 file. | ||
| private State[] state; | ||
|
|
||
| // The size of an S3 file. | ||
| private final long fileSize; | ||
|
|
||
| // The S3 file is divided into blocks of this size. | ||
| private final int blockSize; | ||
|
|
||
| // The S3 file has these many blocks. | ||
| private final int numBlocks; | ||
|
|
||
| /** | ||
| * Constructs an instance of {@link BlockData}. | ||
| * | ||
| * @param fileSize the size of an S3 file. | ||
| * @param blockSize the S3 file is divided into blocks of this size. | ||
| */ | ||
| public BlockData(long fileSize, int blockSize) { | ||
| Validate.checkNotNegative(fileSize, "fileSize"); | ||
| if (fileSize == 0) { | ||
| Validate.checkNotNegative(blockSize, "blockSize"); | ||
| } else { | ||
| Validate.checkPositiveInteger(blockSize, "blockSize"); | ||
| } | ||
|
|
||
| this.fileSize = fileSize; | ||
| this.blockSize = blockSize; | ||
| this.numBlocks = | ||
| (fileSize == 0) ? 0 : ((int) (fileSize / blockSize)) + (fileSize % blockSize > 0 ? 1 : 0); | ||
| this.state = new State[this.numBlocks]; | ||
| for (int b = 0; b < this.numBlocks; b++) { | ||
| this.setState(b, State.NOT_READY); | ||
| } | ||
| } | ||
|
|
||
| public int getBlockSize() { | ||
| return this.blockSize; | ||
| } | ||
|
|
||
| public long getFileSize() { | ||
| return this.fileSize; | ||
| } | ||
|
|
||
| public int getNumBlocks() { | ||
| return this.numBlocks; | ||
| } | ||
|
|
||
| public boolean isLastBlock(int blockNumber) { | ||
| if (this.fileSize == 0) { | ||
| return false; | ||
| } | ||
|
|
||
| throwIfInvalidBlockNumber(blockNumber); | ||
|
|
||
| return blockNumber == (this.numBlocks - 1); | ||
| } | ||
|
|
||
| public int getBlockNumber(long offset) { | ||
| throwIfInvalidOffset(offset); | ||
|
|
||
| return (int) (offset / this.blockSize); | ||
| } | ||
|
|
||
| public int getSize(int blockNumber) { | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (this.fileSize == 0) { | ||
| return 0; | ||
| } | ||
|
|
||
| if (this.isLastBlock(blockNumber)) { | ||
| return (int) (this.fileSize - (((long) this.blockSize) * (this.numBlocks - 1))); | ||
| } else { | ||
| return this.blockSize; | ||
| } | ||
| } | ||
|
|
||
| public boolean isValidOffset(long offset) { | ||
| return (offset >= 0) && (offset < this.fileSize); | ||
| } | ||
|
|
||
| public long getStartOffset(int blockNumber) { | ||
| throwIfInvalidBlockNumber(blockNumber); | ||
|
|
||
| return blockNumber * (long) this.blockSize; | ||
| } | ||
|
|
||
| public int getRelativeOffset(int blockNumber, long offset) { | ||
| throwIfInvalidOffset(offset); | ||
|
|
||
| return (int) (offset - this.getStartOffset(blockNumber)); | ||
| } | ||
|
|
||
| public State getState(int blockNumber) { | ||
| throwIfInvalidBlockNumber(blockNumber); | ||
|
|
||
| return this.state[blockNumber]; | ||
| } | ||
|
|
||
| public State setState(int blockNumber, State blockState) { | ||
| throwIfInvalidBlockNumber(blockNumber); | ||
|
|
||
| this.state[blockNumber] = blockState; | ||
| return blockState; | ||
| } | ||
|
|
||
| // Debug helper. | ||
| public String getStateString() { | ||
| StringBuilder sb = new StringBuilder(); | ||
| int blockNumber = 0; | ||
| while (blockNumber < this.numBlocks) { | ||
| State tstate = this.getState(blockNumber); | ||
| int endBlockNumber = blockNumber; | ||
| while ((endBlockNumber < this.numBlocks) && (this.getState(endBlockNumber) == tstate)) { | ||
| endBlockNumber++; | ||
| } | ||
| sb.append(String.format("[%03d ~ %03d] %s%n", blockNumber, endBlockNumber - 1, tstate)); | ||
| blockNumber = endBlockNumber; | ||
| } | ||
| return sb.toString(); | ||
| } | ||
|
|
||
| private void throwIfInvalidBlockNumber(int blockNumber) { | ||
| Validate.checkWithinRange(blockNumber, "blockNumber", 0, this.numBlocks - 1); | ||
| } | ||
|
|
||
| private void throwIfInvalidOffset(long offset) { | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Validate.checkWithinRange(offset, "offset", 0, this.fileSize - 1); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.common; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| /** | ||
| * Provides read access to the underlying file one block at a time. | ||
| * | ||
| * This class is the simplest form of a {@code BlockManager} that does | ||
| * perform prefetching or caching. | ||
| */ | ||
| public abstract class BlockManager implements Closeable { | ||
|
|
||
| // Information about each block of the underlying file. | ||
| private BlockData blockData; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final |
||
|
|
||
| /** | ||
| * Constructs an instance of {@code BlockManager}. | ||
| * | ||
| * @param blockData information about each block of the underlying file. | ||
| */ | ||
| public BlockManager(BlockData blockData) { | ||
| Validate.checkNotNull(blockData, "blockData"); | ||
|
|
||
| this.blockData = blockData; | ||
| } | ||
|
|
||
| /** | ||
| * Gets block data information. | ||
| * | ||
| * @return instance of {@code BlockData}. | ||
| */ | ||
| public BlockData getBlockData() { | ||
| return this.blockData; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the block having the given {@code blockNumber}. | ||
| * | ||
| * The entire block is read into memory and returned as a {@code BufferData}. | ||
| * The blocks are treated as a limited resource and must be released when | ||
| * one is done reading them. | ||
| * | ||
| * @param blockNumber the number of the block to be read and returned. | ||
| * @return {@code BufferData} having data from the given block. | ||
| * @throws IOException if there an error reading the given block. | ||
| */ | ||
| public BufferData get(int blockNumber) throws IOException { | ||
| Validate.checkNotNegative(blockNumber, "blockNumber"); | ||
|
|
||
| int size = this.blockData.getSize(blockNumber); | ||
| ByteBuffer buffer = ByteBuffer.allocate(size); | ||
| long startOffset = this.blockData.getStartOffset(blockNumber); | ||
| this.read(buffer, startOffset, size); | ||
| buffer.flip(); | ||
| return new BufferData(blockNumber, buffer); | ||
| } | ||
|
|
||
| /** | ||
| * Reads into the given {@code buffer} {@code size} bytes from the underlying file | ||
| * starting at {@code startOffset}. | ||
| * | ||
| * @param buffer the buffer to read data in to. | ||
| * @param startOffset the offset at which reading starts. | ||
| * @param size the number bytes to read. | ||
| * @return number of bytes read. | ||
| * @throws IOException if there an error reading the given block. | ||
| */ | ||
| public abstract int read(ByteBuffer buffer, long startOffset, int size) throws IOException; | ||
|
|
||
| /** | ||
| * Releases resources allocated to the given block. | ||
| * | ||
| * @param data the {@code BufferData} to release. | ||
| */ | ||
| public void release(BufferData data) { | ||
| Validate.checkNotNull(data, "data"); | ||
|
|
||
| // Do nothing because we allocate a new buffer each time. | ||
| } | ||
|
|
||
| /** | ||
| * Requests optional prefetching of the given block. | ||
| * | ||
| * @param blockNumber the id of the block to prefetch. | ||
| */ | ||
| public void requestPrefetch(int blockNumber) { | ||
| Validate.checkNotNegative(blockNumber, "blockNumber"); | ||
|
|
||
| // Do nothing because we do not support prefetches. | ||
| } | ||
|
|
||
| /** | ||
| * Requests cancellation of any previously issued prefetch requests. | ||
| */ | ||
| public void cancelPrefetches() { | ||
| // Do nothing because we do not support prefetches. | ||
| } | ||
|
|
||
| /** | ||
| * Requests that the given block should be copied to the cache. Optional operation. | ||
| * | ||
| * @param data the {@code BufferData} instance to optionally cache. | ||
| */ | ||
| public void requestCaching(BufferData data) { | ||
| // Do nothing because we do not support caching. | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no