Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,9 @@ Status WriteMessage(const Buffer& message, const IpcWriteOptions& options,
RETURN_NOT_OK(file->Write(&internal::kIpcContinuationToken, sizeof(int32_t)));
}

// Write the flatbuffer size prefix including padding
int32_t padded_flatbuffer_size = padded_message_length - prefix_size;
// Write the flatbuffer size prefix including padding in little endian
int32_t padded_flatbuffer_size =
BitUtil::ToLittleEndian(padded_message_length - prefix_size);
RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t)));

// Write the flatbuffer
Expand Down Expand Up @@ -577,18 +578,18 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeInitialData(const uint8_t* data, int64_t size) {
return ConsumeInitial(util::SafeLoadAs<int32_t>(data));
return ConsumeInitial(BitUtil::FromLittleEndian(util::SafeLoadAs<int32_t>(data)));
}

Status ConsumeInitialBuffer(const std::shared_ptr<Buffer>& buffer) {
ARROW_ASSIGN_OR_RAISE(auto continuation, ConsumeDataBufferInt32(buffer));
return ConsumeInitial(continuation);
return ConsumeInitial(BitUtil::FromLittleEndian(continuation));
}

Status ConsumeInitialChunks() {
int32_t continuation = 0;
RETURN_NOT_OK(ConsumeDataChunks(sizeof(int32_t), &continuation));
return ConsumeInitial(continuation);
return ConsumeInitial(BitUtil::FromLittleEndian(continuation));
}

Status ConsumeInitial(int32_t continuation) {
Expand Down Expand Up @@ -616,18 +617,19 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeMetadataLengthData(const uint8_t* data, int64_t size) {
return ConsumeMetadataLength(util::SafeLoadAs<int32_t>(data));
return ConsumeMetadataLength(
BitUtil::FromLittleEndian(util::SafeLoadAs<int32_t>(data)));
}

Status ConsumeMetadataLengthBuffer(const std::shared_ptr<Buffer>& buffer) {
ARROW_ASSIGN_OR_RAISE(auto metadata_length, ConsumeDataBufferInt32(buffer));
return ConsumeMetadataLength(metadata_length);
return ConsumeMetadataLength(BitUtil::FromLittleEndian(metadata_length));
}

Status ConsumeMetadataLengthChunks() {
int32_t metadata_length = 0;
RETURN_NOT_OK(ConsumeDataChunks(sizeof(int32_t), &metadata_length));
return ConsumeMetadataLength(metadata_length);
return ConsumeMetadataLength(BitUtil::FromLittleEndian(metadata_length));
}

Status ConsumeMetadataLength(int32_t metadata_length) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ TEST_P(TestMessage, SerializeTo) {
ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length,
output_length);
ASSERT_OK_AND_EQ(output_length, stream->Tell());
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
// chech whether length is written in little endian
auto buffer_ptr = buffer.get()->data();
ASSERT_EQ(output_length - body_length - prefix_size,
BitUtil::FromLittleEndian(*(uint32_t*)(buffer_ptr + 4)));
};

CheckWithAlignment(8);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
return Status::Invalid("Not an Arrow file");
}

int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
int32_t footer_length =
BitUtil::FromLittleEndian(*reinterpret_cast<const int32_t*>(buffer->data()));

if (footer_length <= 0 || footer_length > footer_offset_ - magic_size * 2 - 4) {
return Status::Invalid("File is smaller than indicated metadata size");
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,8 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
return Status::Invalid("Invalid file footer");
}

// write footer length in little endian
footer_length = BitUtil::ToLittleEndian(footer_length);
RETURN_NOT_OK(Write(&footer_length, sizeof(int32_t)));

// Write magic bytes to end file
Expand Down