Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ static ParquetMetadata readSummaryMetadata(Configuration configuration, Path bas
}

static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
Map<Path, ParquetMetadata> footers = new HashMap<>();
List<BlockMetaData> blocks = mergedFooters.getBlocks();
for (BlockMetaData block : blocks) {
String path = block.getPath();
Expand Down Expand Up @@ -465,6 +465,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, Path
* @deprecated will be removed in 2.0.0;
* use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
@Deprecated
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
}
Expand Down Expand Up @@ -796,7 +797,7 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx
}

private static <T> List<T> listWithNulls(int size) {
return Stream.generate(() -> (T) null).limit(size).collect(Collectors.toCollection(ArrayList<T>::new));
return new ArrayList<>(Collections.nCopies(size, null));
}

public ParquetMetadata getFooter() {
Expand All @@ -819,7 +820,7 @@ public FileMetaData getFileMetaData() {
}

public long getRecordCount() {
long total = 0;
long total = 0L;
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
Expand All @@ -830,7 +831,7 @@ public long getFilteredRecordCount() {
if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
return getRecordCount();
}
long total = 0;
long total = 0L;
for (int i = 0, n = blocks.size(); i < n; ++i) {
total += getRowRanges(i).rowCount();
}
Expand Down Expand Up @@ -1015,7 +1016,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
return readNextRowGroup();
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0) {
if (block.getRowCount() == 0L) {
throw new RuntimeException("Illegal row group of 0 rows");
}
RowRanges rowRanges = getRowRanges(currentBlock);
Expand Down Expand Up @@ -1046,7 +1047,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges);
// prepare the list of consecutive parts to read them in one scan
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
Expand All @@ -1065,7 +1066,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc
allParts.add(currentParts);
}
ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
(int) range.getLength());
Math.toIntExact(range.getLength()));
currentParts.addChunk(chunkDescriptor);
builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
}
Expand Down Expand Up @@ -1401,29 +1402,19 @@ public ChunkListBuilder(long rowCount) {
}

void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
ChunkData data = map.get(descriptor);
if (data == null) {
data = new ChunkData();
map.put(descriptor, data);
}
data.buffers.addAll(buffers);

map.computeIfAbsent(descriptor, d -> new ChunkData()).buffers.addAll(buffers);
lastDescriptor = descriptor;
this.f = f;
}

void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) {
ChunkData data = map.get(descriptor);
if (data == null) {
data = new ChunkData();
map.put(descriptor, data);
}
data.offsetIndex = offsetIndex;
map.computeIfAbsent(descriptor, d -> new ChunkData()).offsetIndex = offsetIndex;
}

List<Chunk> build() {
List<Chunk> chunks = new ArrayList<>();
for (Entry<ChunkDescriptor, ChunkData> entry : map.entrySet()) {
Set<Entry<ChunkDescriptor, ChunkData>> entries = map.entrySet();
List<Chunk> chunks = new ArrayList<>(entries.size());
for (Entry<ChunkDescriptor, ChunkData> entry : entries) {
ChunkDescriptor descriptor = entry.getKey();
ChunkData data = entry.getValue();
if (descriptor.equals(lastDescriptor)) {
Expand Down Expand Up @@ -1489,11 +1480,11 @@ public ColumnChunkPageReader readAllPages() throws IOException {

public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor,
byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) throws IOException {
List<DataPage> pagesInChunk = new ArrayList<DataPage>();
List<DataPage> pagesInChunk = new ArrayList<>();
DictionaryPage dictionaryPage = null;
PrimitiveType type = getFileMetaData().getSchema()
.getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
long valuesCountReadSoFar = 0L;
int dataPageCountReadSoFar = 0;
byte[] dataPageHeaderAAD = null;
if (null != headerBlockDecryptor) {
Expand Down Expand Up @@ -1674,11 +1665,13 @@ public BytesInput readAsBytesInput(int size) throws IOException {
int missingBytes = size - available;
LOG.info("completed the column chunk with {} bytes", missingBytes);

List<ByteBuffer> buffers = new ArrayList<>();
buffers.addAll(stream.sliceBuffers(available));
List<ByteBuffer> streamBuffers = stream.sliceBuffers(available);

ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes);
f.readFully(lastBuffer);

List<ByteBuffer> buffers = new ArrayList<>(streamBuffers.size() + 1);
buffers.addAll(streamBuffers);
buffers.add(lastBuffer);

return BytesInput.from(buffers);
Expand Down Expand Up @@ -1743,7 +1736,7 @@ private class ConsecutivePartList {

private final long offset;
private int length;
private final List<ChunkDescriptor> chunks = new ArrayList<ChunkDescriptor>();
private final List<ChunkDescriptor> chunks = new ArrayList<>();

/**
* @param offset where the first chunk starts
Expand All @@ -1768,7 +1761,6 @@ public void addChunk(ChunkDescriptor descriptor) {
* @throws IOException if there is an error while reading from the stream
*/
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);

int fullAllocations = length / options.getMaxAllocationSize();
Expand All @@ -1793,8 +1785,7 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
for (final ChunkDescriptor descriptor : chunks) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
}
}
Expand Down