Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,15 +489,24 @@ namespace orc {
throw std::range_error("key not found");
}

void ReaderImpl::getRowIndexStatistics(
uint64_t stripeOffset, const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const {
void ReaderImpl::getRowIndexStatistics(const proto::StripeInformation& stripeInfo,
uint64_t stripeIndex, const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const {
int num_streams = currentStripeFooter.streams_size();
uint64_t offset = stripeOffset;
uint64_t offset = stripeInfo.offset();
uint64_t indexEnd = stripeInfo.offset() + stripeInfo.indexlength();
for (int i = 0; i < num_streams; i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
uint64_t length = static_cast<uint64_t>(stream.length());
if (static_cast<StreamKind>(stream.kind()) == StreamKind::StreamKind_ROW_INDEX) {
if (offset + length > indexEnd) {
std::stringstream msg;
msg << "Malformed RowIndex stream meta in stripe " << stripeIndex
<< ": streamOffset=" << offset << ", streamLength=" << length
<< ", stripeOffset=" << stripeInfo.offset() << ", stripeIndexLength="
<< stripeInfo.indexlength();
throw ParseError(msg.str());
}
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents->compression,
std::unique_ptr<SeekableInputStream>
Expand Down Expand Up @@ -554,7 +563,7 @@ namespace orc {
proto::StripeFooter currentStripeFooter =
getStripeFooter(currentStripeInfo, *contents.get());

getRowIndexStatistics(currentStripeInfo.offset(), currentStripeFooter, &indexStats);
getRowIndexStatistics(currentStripeInfo, stripeIndex, currentStripeFooter, &indexStats);

const Timezone& writerTZ =
currentStripeFooter.has_writertimezone() ?
Expand Down Expand Up @@ -586,8 +595,15 @@ namespace orc {

void ReaderImpl::readMetadata() const {
uint64_t metadataSize = contents->postscript->metadatalength();
uint64_t metadataStart = fileLength - metadataSize
- contents->postscript->footerlength() - postscriptLength - 1;
uint64_t footerLength = contents->postscript->footerlength();
if (fileLength < metadataSize + footerLength + postscriptLength + 1) {
std::stringstream msg;
msg << "Invalid Metadata length: fileLength=" << fileLength
<< ", metadataLength=" << metadataSize << ", footerLength=" << footerLength
<< ", postscriptLength=" << postscriptLength;
throw ParseError(msg.str());
}
uint64_t metadataStart = fileLength - metadataSize - footerLength - postscriptLength - 1;
if (metadataSize != 0) {
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents->compression,
Expand Down Expand Up @@ -798,13 +814,24 @@ namespace orc {
void RowReaderImpl::startNextStripe() {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
uint64_t fileLength = contents->stream->getLength();
if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
currentStripeInfo.datalength() + currentStripeInfo.footerlength() >= fileLength) {
std::stringstream msg;
msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
<< fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
<< currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
<< ", footerLength=" << currentStripeInfo.footerlength() << ")";
throw ParseError(msg.str());
}
currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();
const Timezone& writerTimezone =
currentStripeFooter.has_writertimezone() ?
getTimezoneByName(currentStripeFooter.writertimezone()) :
localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripeFooter,
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo,
currentStripeFooter,
currentStripeInfo.offset(),
*(contents->stream.get()),
writerTimezone);
Expand Down Expand Up @@ -889,6 +916,12 @@ namespace orc {

std::unique_ptr<proto::PostScript> postscript =
std::unique_ptr<proto::PostScript>(new proto::PostScript());
if (readSize < 1 + postscriptSize) {
std::stringstream msg;
msg << "Invalid ORC postscript length: " << postscriptSize << ", file length = "
<< stream->getLength();
throw ParseError(msg.str());
}
if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize,
static_cast<int>(postscriptSize))) {
throw ParseError("Failed to parse the postscript from " +
Expand Down Expand Up @@ -997,6 +1030,11 @@ namespace orc {
buffer.get(), postscriptLength));
uint64_t footerSize = contents->postscript->footerlength();
uint64_t tailSize = 1 + postscriptLength + footerSize;
if (tailSize >= fileLength) {
std::stringstream msg;
msg << "Invalid ORC tailSize=" << tailSize << ", fileLength=" << fileLength;
throw ParseError(msg.str());
}
uint64_t footerOffset;

if (tailSize > readSize) {
Expand Down
2 changes: 1 addition & 1 deletion c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ namespace orc {
// internal methods
void readMetadata() const;
void checkOrcVersion();
void getRowIndexStatistics(uint64_t stripeOffset,
void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;

Expand Down
18 changes: 15 additions & 3 deletions c++/src/StripeStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@

namespace orc {

StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader,
StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index,
const proto::StripeInformation& _stripeInfo,
const proto::StripeFooter& _footer,
uint64_t _stripeStart,
InputStream& _input,
const Timezone& _writerTimezone
): reader(_reader),
stripeInfo(_stripeInfo),
footer(_footer),
stripeIndex(_index),
stripeStart(_stripeStart),
input(_input),
writerTimezone(_writerTimezone) {
Expand Down Expand Up @@ -77,14 +80,23 @@ namespace orc {
proto::Stream_Kind kind,
bool shouldStream) const {
uint64_t offset = stripeStart;
uint64_t dataEnd = stripeInfo.offset() + stripeInfo.indexlength() + stripeInfo.datalength();
MemoryPool *pool = reader.getFileContents().pool;
for(int i = 0; i < footer.streams_size(); ++i) {
const proto::Stream& stream = footer.streams(i);
if (stream.has_kind() &&
stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
uint64_t myBlock = shouldStream ? input.getNaturalReadSize():
stream.length();
uint64_t streamLength = stream.length();
uint64_t myBlock = shouldStream ? input.getNaturalReadSize(): streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex
<< ": streamOffset=" << offset << ", streamLength=" << streamLength
<< ", stripeOffset=" << stripeInfo.offset() << ", stripeIndexLength="
<< stripeInfo.indexlength() << ", stripeDataLength=" << stripeInfo.datalength();
throw ParseError(msg.str());
}
return createDecompressor(reader.getCompression(),
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream
Expand Down
5 changes: 4 additions & 1 deletion c++/src/StripeStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ namespace orc {
class StripeStreamsImpl: public StripeStreams {
private:
const RowReaderImpl& reader;
const proto::StripeInformation& stripeInfo;
const proto::StripeFooter& footer;
const uint64_t stripeIndex;
const uint64_t stripeStart;
InputStream& input;
const Timezone& writerTimezone;

public:
StripeStreamsImpl(const RowReaderImpl& reader,
StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
const proto::StripeInformation& stripeInfo,
const proto::StripeFooter& footer,
uint64_t stripeStart,
InputStream& input,
Expand Down