Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ static ParquetMetadata readSummaryMetadata(Configuration configuration, Path bas
}

static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
Map<Path, ParquetMetadata> footers = new HashMap<>();
List<BlockMetaData> blocks = mergedFooters.getBlocks();
for (BlockMetaData block : blocks) {
String path = block.getPath();
Expand Down Expand Up @@ -452,6 +452,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, Path
* @deprecated will be removed in 2.0.0;
* use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
@Deprecated
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
}
Expand Down Expand Up @@ -518,7 +519,7 @@ private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptio
LOG.debug("File length {}", fileLen);
int FOOTER_LENGTH_SIZE = 4;
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
throw new RuntimeException(file.toString() + " is not a Parquet file (too small length: " + fileLen + ")");
throw new RuntimeException(file + " is not a Parquet file (too small length: " + fileLen + ")");
}
long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
LOG.debug("reading footer index at {}", footerLengthIndex);
Expand All @@ -528,7 +529,7 @@ private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptio
byte[] magic = new byte[MAGIC.length];
f.readFully(magic);
if (!Arrays.equals(MAGIC, magic)) {
throw new RuntimeException(file.toString() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
throw new RuntimeException(file + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
}
long footerIndex = footerLengthIndex - footerLength;
LOG.debug("read footer length: {}, footer index: {}", footerLength, footerIndex);
Expand Down Expand Up @@ -727,7 +728,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx
}

private static <T> List<T> listWithNulls(int size) {
return Stream.generate(() -> (T) null).limit(size).collect(Collectors.toCollection(ArrayList<T>::new));
return new ArrayList<>(Collections.nCopies(size, null));
}

public ParquetMetadata getFooter() {
Expand All @@ -750,7 +751,7 @@ public FileMetaData getFileMetaData() {
}

public long getRecordCount() {
long total = 0;
long total = 0L;
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
Expand All @@ -761,7 +762,7 @@ public long getFilteredRecordCount() {
if (!options.useColumnIndexFilter()) {
return getRecordCount();
}
long total = 0;
long total = 0L;
for (int i = 0, n = blocks.size(); i < n; ++i) {
total += getRowRanges(i).rowCount();
}
Expand Down Expand Up @@ -884,7 +885,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
return readNextRowGroup();
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0) {
if (block.getRowCount() == 0L) {
throw new RuntimeException("Illegal row group of 0 rows");
}
ColumnIndexStore ciStore = getColumnIndexStore(currentBlock);
Expand All @@ -903,7 +904,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges);
// prepare the list of consecutive parts to read them in one scan
ChunkListBuilder builder = new ChunkListBuilder();
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
Expand All @@ -922,7 +923,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
allParts.add(currentParts);
}
ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
(int) range.getLength());
Math.toIntExact(range.getLength()));
currentParts.addChunk(chunkDescriptor);
builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
}
Expand Down Expand Up @@ -1110,29 +1111,19 @@ private class ChunkData {
private SeekableInputStream f;

void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
ChunkData data = map.get(descriptor);
if (data == null) {
data = new ChunkData();
map.put(descriptor, data);
}
data.buffers.addAll(buffers);

map.computeIfAbsent(descriptor, d -> new ChunkData()).buffers.addAll(buffers);
lastDescriptor = descriptor;
this.f = f;
}

void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) {
ChunkData data = map.get(descriptor);
if (data == null) {
data = new ChunkData();
map.put(descriptor, data);
}
data.offsetIndex = offsetIndex;
map.computeIfAbsent(descriptor, d -> new ChunkData()).offsetIndex = offsetIndex;
}

List<Chunk> build() {
List<Chunk> chunks = new ArrayList<>();
for (Entry<ChunkDescriptor, ChunkData> entry : map.entrySet()) {
Set<Entry<ChunkDescriptor, ChunkData>> entries = map.entrySet();
List<Chunk> chunks = new ArrayList<>(entries.size());
for (Entry<ChunkDescriptor, ChunkData> entry : entries) {
ChunkDescriptor descriptor = entry.getKey();
ChunkData data = entry.getValue();
if (descriptor.equals(lastDescriptor)) {
Expand Down Expand Up @@ -1187,11 +1178,11 @@ private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) {
* @return the list of pages
*/
public ColumnChunkPageReader readAllPages() throws IOException {
List<DataPage> pagesInChunk = new ArrayList<DataPage>();
List<DataPage> pagesInChunk = new ArrayList<>();
DictionaryPage dictionaryPage = null;
PrimitiveType type = getFileMetaData().getSchema()
.getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
long valuesCountReadSoFar = 0L;
int dataPageCountReadSoFar = 0;
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
PageHeader pageHeader = readPageHeader();
Expand Down Expand Up @@ -1349,11 +1340,13 @@ public BytesInput readAsBytesInput(int size) throws IOException {
int missingBytes = size - available;
LOG.info("completed the column chunk with {} bytes", missingBytes);

List<ByteBuffer> buffers = new ArrayList<>();
buffers.addAll(stream.sliceBuffers(available));
List<ByteBuffer> streamBuffers = stream.sliceBuffers(available);

ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes);
f.readFully(lastBuffer);

List<ByteBuffer> buffers = new ArrayList<>(streamBuffers.size() + 1);
buffers.addAll(streamBuffers);
buffers.add(lastBuffer);

return BytesInput.from(buffers);
Expand Down Expand Up @@ -1418,7 +1411,7 @@ private class ConsecutivePartList {

private final long offset;
private int length;
private final List<ChunkDescriptor> chunks = new ArrayList<ChunkDescriptor>();
private final List<ChunkDescriptor> chunks = new ArrayList<>();

/**
* @param offset where the first chunk starts
Expand All @@ -1443,7 +1436,6 @@ public void addChunk(ChunkDescriptor descriptor) {
* @throws IOException if there is an error while reading from the stream
*/
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);

int fullAllocations = length / options.getMaxAllocationSize();
Expand All @@ -1468,8 +1460,7 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
for (final ChunkDescriptor descriptor : chunks) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
}
}
Expand Down