diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index abd53759d83..aa23e5b7c18 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -30,4 +30,6 @@ pip install -e $arrow_dir/dev/archery archery integration --with-all --run-flight \ --gold-dirs=$gold_dir/0.14.1 \ --gold-dirs=$gold_dir/0.17.1 \ + --gold-dirs=$gold_dir/1.0.0-bigendian \ + --gold-dirs=$gold_dir/1.0.0-littleendian \ --gold-dirs=$gold_dir/2.0.0-compression \ diff --git a/cpp/src/arrow/array/array_test.cc b/cpp/src/arrow/array/array_test.cc index 89087ee318c..a97bf134604 100644 --- a/cpp/src/arrow/array/array_test.cc +++ b/cpp/src/arrow/array/array_test.cc @@ -45,6 +45,7 @@ #include "arrow/result.h" #include "arrow/scalar.h" #include "arrow/status.h" +#include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_common.h" #include "arrow/testing/gtest_compat.h" #include "arrow/testing/gtest_util.h" @@ -2598,4 +2599,350 @@ TEST(TestRechunkArraysConsistently, Plain) { } } +// ---------------------------------------------------------------------- +// Test SwapEndianArrayData + +/// \brief Indicate if fields are equals. +/// +/// \param[in] target ArrayData to be converted and tested +/// \param[in] expected result ArrayData +void AssertArrayDataEqualsWithSwapEndian(const std::shared_ptr& target, + const std::shared_ptr& expected) { + auto swap_array = MakeArray(*::arrow::internal::SwapEndianArrayData(target)); + auto expected_array = MakeArray(expected); + ASSERT_ARRAYS_EQUAL(*swap_array, *expected_array); + ASSERT_OK(swap_array->ValidateFull()); +} + +TEST(TestSwapEndianArrayData, PrimitiveType) { + auto null_buffer = Buffer::FromString("\xff"); + auto data_int_buffer = Buffer::FromString("01234567"); + + auto data = ArrayData::Make(null(), 0, {nullptr}, 0); + auto expected_data = data; + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + data = ArrayData::Make(boolean(), 8, {null_buffer, data_int_buffer}, 0); + expected_data = data; + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + data = ArrayData::Make(int8(), 8, {null_buffer, data_int_buffer}, 0); + expected_data = data; + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + data = ArrayData::Make(uint16(), 4, {null_buffer, data_int_buffer}, 0); + auto data_int16_buffer = Buffer::FromString("10325476"); + expected_data = ArrayData::Make(uint16(), 4, {null_buffer, data_int16_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + data = ArrayData::Make(int32(), 2, {null_buffer, data_int_buffer}, 0); + auto data_int32_buffer = Buffer::FromString("32107654"); + expected_data = ArrayData::Make(int32(), 2, {null_buffer, data_int32_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + data = ArrayData::Make(uint64(), 1, {null_buffer, data_int_buffer}, 0); + auto data_int64_buffer = Buffer::FromString("76543210"); + expected_data = ArrayData::Make(uint64(), 1, {null_buffer, data_int64_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + auto data_16byte_buffer = Buffer::FromString("0123456789abcdef"); + data = ArrayData::Make(decimal128(38, 10), 1, {null_buffer, data_16byte_buffer}); + auto data_decimal128_buffer = Buffer::FromString("fedcba9876543210"); + expected_data = + ArrayData::Make(decimal128(38, 10), 1, {null_buffer, data_decimal128_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + auto data_32byte_buffer = Buffer::FromString("0123456789abcdef123456789ABCDEF0"); + data = ArrayData::Make(decimal256(76, 20), 1, {null_buffer, data_32byte_buffer}); + auto data_decimal256_buffer = Buffer::FromString("0FEDCBA987654321fedcba9876543210"); + expected_data = + ArrayData::Make(decimal256(76, 20), 1, {null_buffer, data_decimal256_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + auto data_float_buffer = Buffer::FromString("01200560"); + data = ArrayData::Make(float32(), 2, {null_buffer, data_float_buffer}, 0); + auto data_float32_buffer = Buffer::FromString("02100650"); + expected_data = ArrayData::Make(float32(), 2, {null_buffer, data_float32_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + data = ArrayData::Make(float64(), 1, {null_buffer, data_float_buffer}); + auto data_float64_buffer = Buffer::FromString("06500210"); + expected_data = ArrayData::Make(float64(), 1, {null_buffer, data_float64_buffer}, 0); + AssertArrayDataEqualsWithSwapEndian(data, expected_data); + + // With offset > 0 + data = + ArrayData::Make(int64(), 1, {null_buffer, data_int_buffer}, kUnknownNullCount, 1); + ASSERT_RAISES(Invalid, ::arrow::internal::SwapEndianArrayData(data)); +} + +std::shared_ptr ReplaceBuffers(const std::shared_ptr& data, + const int32_t buffer_index, + const std::vector& buffer_data) { + const auto test_data = data->Copy(); + test_data->buffers[buffer_index] = + std::make_shared(buffer_data.data(), buffer_data.size()); + return test_data; +} + +std::shared_ptr ReplaceBuffersInChild(const std::shared_ptr& data, + const int32_t child_index, + const std::vector& child_data) { + const auto test_data = data->Copy(); + // assume updating only buffer[1] in child_data + auto child_array_data = test_data->child_data[child_index]->Copy(); + child_array_data->buffers[1] = + std::make_shared(child_data.data(), child_data.size()); + test_data->child_data[child_index] = child_array_data; + return test_data; +} + +std::shared_ptr ReplaceBuffersInDictionary( + const std::shared_ptr& data, const int32_t buffer_index, + const std::vector& buffer_data) { + const auto test_data = data->Copy(); + auto dict_array_data = test_data->dictionary->Copy(); + dict_array_data->buffers[buffer_index] = + std::make_shared(buffer_data.data(), buffer_data.size()); + test_data->dictionary = dict_array_data; + return test_data; +} + +TEST(TestSwapEndianArrayData, BinaryType) { + auto array = ArrayFromJSON(binary(), R"(["0123", null, "45"])"); + const std::vector offset1 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6}; +#else + {0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6, 0, 0, 0}; +#endif + auto expected_data = array->data(); + auto test_data = ReplaceBuffers(expected_data, 1, offset1); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); + + array = ArrayFromJSON(large_binary(), R"(["01234", null, "567"])"); + const std::vector offset2 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, + 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 8}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, + 5, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0}; +#endif + expected_data = array->data(); + test_data = ReplaceBuffers(expected_data, 1, offset2); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); + + array = ArrayFromJSON(fixed_size_binary(3), R"(["012", null, "345"])"); + expected_data = array->data(); + AssertArrayDataEqualsWithSwapEndian(expected_data, expected_data); +} + +TEST(TestSwapEndianArrayData, StringType) { + auto array = ArrayFromJSON(utf8(), R"(["ABCD", null, "EF"])"); + const std::vector offset1 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6}; +#else + {0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6, 0, 0, 0}; +#endif + auto expected_data = array->data(); + auto test_data = ReplaceBuffers(expected_data, 1, offset1); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); + + array = ArrayFromJSON(large_utf8(), R"(["ABCDE", null, "FGH"])"); + const std::vector offset2 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, + 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 8}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, + 5, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0}; +#endif + expected_data = array->data(); + test_data = ReplaceBuffers(expected_data, 1, offset2); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); +} + +TEST(TestSwapEndianArrayData, ListType) { + auto type1 = std::make_shared(int32()); + auto array = ArrayFromJSON(type1, "[[0, 1, 2, 3], null, [4, 5]]"); + const std::vector offset1 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6}; +#else + {0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6, 0, 0, 0}; +#endif + const std::vector data1 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5}; +#else + {0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0}; +#endif + auto expected_data = array->data(); + auto test_data = ReplaceBuffers(expected_data, 1, offset1); + test_data = ReplaceBuffersInChild(test_data, 0, data1); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); + + auto type2 = std::make_shared(int64()); + array = ArrayFromJSON(type2, "[[0, 1, 2], null, [3]]"); + const std::vector offset2 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, + 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, + 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0}; +#endif + const std::vector data2 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, + 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0}; +#endif + expected_data = array->data(); + test_data = ReplaceBuffers(expected_data, 1, offset2); + test_data = ReplaceBuffersInChild(test_data, 0, data2); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); + + auto type3 = std::make_shared(int32(), 2); + array = ArrayFromJSON(type3, "[[0, 1], null, [2, 3]]"); + expected_data = array->data(); + const std::vector data3 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3}; +#else + {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0}; +#endif + test_data = ReplaceBuffersInChild(expected_data, 0, data3); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); +} + +TEST(TestSwapEndianArrayData, DictionaryType) { + auto type = dictionary(int32(), int16()); + auto dict = ArrayFromJSON(int16(), "[4, 5, 6, 7]"); + DictionaryArray array(type, ArrayFromJSON(int32(), "[0, 2, 3]"), dict); + auto expected_data = array.data(); + const std::vector data1 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3}; +#else + {0, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0}; +#endif + const std::vector data2 = +#if ARROW_LITTLE_ENDIAN + {0, 4, 0, 5, 0, 6, 0, 7}; +#else + {4, 0, 5, 0, 6, 0, 7, 0}; +#endif + auto test_data = ReplaceBuffers(expected_data, 1, data1); + test_data = ReplaceBuffersInDictionary(test_data, 1, data2); + // dictionary must be explicitly swapped + test_data->dictionary = *::arrow::internal::SwapEndianArrayData(test_data->dictionary); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); +} + +TEST(TestSwapEndianArrayData, StructType) { + auto array = ArrayFromJSON(struct_({field("a", int32()), field("b", utf8())}), + R"([{"a": 4, "b": null}, {"a": null, "b": "foo"}])"); + auto expected_data = array->data(); + const std::vector data1 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 4, 0, 0, 0, 0}; +#else + {4, 0, 0, 0, 0, 0, 0, 0}; +#endif + const std::vector data2 = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0}; +#endif + auto test_data = ReplaceBuffersInChild(expected_data, 0, data1); + test_data = ReplaceBuffersInChild(test_data, 1, data2); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); +} + +TEST(TestSwapEndianArrayData, UnionType) { + auto expected_i8 = ArrayFromJSON(int8(), "[127, null, null, null, null]"); + auto expected_str = ArrayFromJSON(utf8(), R"([null, "abcd", null, null, ""])"); + auto expected_i32 = ArrayFromJSON(int32(), "[null, null, 1, 2, null]"); + std::vector expected_types_vector; + expected_types_vector.push_back(Type::INT8); + expected_types_vector.insert(expected_types_vector.end(), 2, Type::STRING); + expected_types_vector.insert(expected_types_vector.end(), 2, Type::INT32); + std::shared_ptr expected_types; + ArrayFromVector(expected_types_vector, &expected_types); + auto arr1 = SparseUnionArray::Make( + *expected_types, {expected_i8, expected_str, expected_i32}, {"i8", "str", "i32"}, + {Type::INT8, Type::STRING, Type::INT32}); + auto expected_data = (*arr1)->data(); + const std::vector data1a = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 4}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0}; +#endif + const std::vector data1b = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0}; +#endif + auto test_data = ReplaceBuffersInChild(expected_data, 1, data1a); + test_data = ReplaceBuffersInChild(test_data, 2, data1b); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); + + expected_i8 = ArrayFromJSON(int8(), "[33, 10, -10]"); + expected_str = ArrayFromJSON(utf8(), R"(["abc", "", "def"])"); + expected_i32 = ArrayFromJSON(int32(), "[1, -259, 2]"); + auto expected_offsets = ArrayFromJSON(int32(), "[0, 0, 0, 1, 1, 1, 2, 2, 2]"); + auto arr2 = DenseUnionArray::Make( + *expected_types, *expected_offsets, {expected_i8, expected_str, expected_i32}, + {"i8", "str", "i32"}, {Type::INT8, Type::STRING, Type::INT32}); + expected_data = (*arr2)->data(); + const std::vector data2a = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, + 0, 1, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2}; +#else + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, + 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0}; +#endif + const std::vector data2b = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 3, 0, 0, 0, 6}; +#else + {0, 0, 0, 0, 3, 0, 0, 0, 3, 0, 0, 0, 6, 0, 0, 0}; +#endif + const std::vector data2c = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 1, 255, 255, 254, 253, 0, 0, 0, 2}; +#else + {1, 0, 0, 0, 253, 254, 255, 255, 2, 0, 0, 0}; +#endif + test_data = ReplaceBuffers(expected_data, 2, data2a); + test_data = ReplaceBuffersInChild(test_data, 1, data2b); + test_data = ReplaceBuffersInChild(test_data, 2, data2c); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); +} + +TEST(TestSwapEndianArrayData, ExtensionType) { + auto array_int16 = ArrayFromJSON(int16(), "[0, 1, 2, 3]"); + auto ext_data = array_int16->data()->Copy(); + ext_data->type = std::make_shared(); + auto array = MakeArray(ext_data); + auto expected_data = array->data(); + const std::vector data = +#if ARROW_LITTLE_ENDIAN + {0, 0, 0, 1, 0, 2, 0, 3}; +#else + {0, 0, 1, 0, 2, 0, 3, 0}; +#endif + auto test_data = ReplaceBuffers(expected_data, 1, data); + AssertArrayDataEqualsWithSwapEndian(test_data, expected_data); +} + } // namespace arrow diff --git a/cpp/src/arrow/array/array_view_test.cc b/cpp/src/arrow/array/array_view_test.cc index e73bbda7abc..07dc3014e40 100644 --- a/cpp/src/arrow/array/array_view_test.cc +++ b/cpp/src/arrow/array/array_view_test.cc @@ -29,7 +29,7 @@ #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" -#include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" namespace arrow { diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index 0d498931d42..297745a2b17 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -41,6 +41,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/decimal.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/visitor_inline.h" @@ -51,7 +52,7 @@ using internal::checked_cast; // ---------------------------------------------------------------------- // Loading from ArrayData -namespace internal { +namespace { class ArrayDataWrapper { public: @@ -74,11 +75,209 @@ class ArrayDataWrapper { std::shared_ptr* out_; }; +class ArrayDataEndianSwapper { + public: + ArrayDataEndianSwapper(const std::shared_ptr& data, int64_t length) + : data_(data), length_(length) { + out_ = data->Copy(); + } + + Status SwapType(const DataType& type) { + RETURN_NOT_OK(VisitTypeInline(type, this)); + RETURN_NOT_OK(SwapChildren(type.fields())); + if (internal::HasValidityBitmap(type.id())) { + // Copy null bitmap + out_->buffers[0] = data_->buffers[0]; + } + return Status::OK(); + } + + Status SwapChildren(const FieldVector& child_fields) { + for (size_t i = 0; i < child_fields.size(); i++) { + ARROW_ASSIGN_OR_RAISE(out_->child_data[i], + internal::SwapEndianArrayData(data_->child_data[i])); + } + return Status::OK(); + } + + template + Result> ByteSwapBuffer( + const std::shared_ptr& in_buffer) { + if (sizeof(T) == 1) { + // if data size is 1, element is not swapped. We can use the original buffer + return in_buffer; + } + auto in_data = reinterpret_cast(in_buffer->data()); + ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size())); + auto out_data = reinterpret_cast(out_buffer->mutable_data()); + int64_t length = in_buffer->size() / sizeof(T); + for (int64_t i = 0; i < length; i++) { + out_data[i] = BitUtil::ByteSwap(in_data[i]); + } + return std::move(out_buffer); + } + + template + Status SwapOffsets(int index) { + if (data_->buffers[index] == nullptr || data_->buffers[index]->size() == 0) { + out_->buffers[index] = data_->buffers[index]; + return Status::OK(); + } + // Except union, offset has one more element rather than data->length + ARROW_ASSIGN_OR_RAISE(out_->buffers[index], + ByteSwapBuffer(data_->buffers[index])); + return Status::OK(); + } + + template + enable_if_t::value && + !std::is_base_of::value && + !std::is_base_of::value, + Status> + Visit(const T& type) { + using value_type = typename T::c_type; + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], + ByteSwapBuffer(data_->buffers[1])); + return Status::OK(); + } + + Status Visit(const Decimal128Type& type) { + auto data = reinterpret_cast(data_->buffers[1]->data()); + ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size())); + auto new_data = reinterpret_cast(new_buffer->mutable_data()); + int64_t length = length_; + length = data_->buffers[1]->size() / (sizeof(uint64_t) * 2); + for (int64_t i = 0; i < length; i++) { + uint64_t tmp; + auto idx = i * 2; +#if ARROW_LITTLE_ENDIAN + tmp = BitUtil::FromBigEndian(data[idx]); + new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]); + new_data[idx + 1] = tmp; +#else + tmp = BitUtil::FromLittleEndian(data[idx]); + new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]); + new_data[idx + 1] = tmp; +#endif + } + out_->buffers[1] = std::move(new_buffer); + return Status::OK(); + } + + Status Visit(const Decimal256Type& type) { + auto data = reinterpret_cast(data_->buffers[1]->data()); + ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size())); + auto new_data = reinterpret_cast(new_buffer->mutable_data()); + int64_t length = length_; + length = data_->buffers[1]->size() / (sizeof(uint64_t) * 4); + for (int64_t i = 0; i < length; i++) { + uint64_t tmp0, tmp1, tmp2; + auto idx = i * 4; +#if ARROW_LITTLE_ENDIAN + tmp0 = BitUtil::FromBigEndian(data[idx]); + tmp1 = BitUtil::FromBigEndian(data[idx + 1]); + tmp2 = BitUtil::FromBigEndian(data[idx + 2]); + new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]); + new_data[idx + 1] = tmp2; + new_data[idx + 2] = tmp1; + new_data[idx + 3] = tmp0; +#else + tmp0 = BitUtil::FromLittleEndian(data[idx]); + tmp1 = BitUtil::FromLittleEndian(data[idx + 1]); + tmp2 = BitUtil::FromLittleEndian(data[idx + 2]); + new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]); + new_data[idx + 1] = tmp2; + new_data[idx + 2] = tmp1; + new_data[idx + 3] = tmp0; +#endif + } + out_->buffers[1] = std::move(new_buffer); + return Status::OK(); + } + + Status Visit(const DayTimeIntervalType& type) { + ARROW_ASSIGN_OR_RAISE(out_->buffers[1], ByteSwapBuffer(data_->buffers[1])); + return Status::OK(); + } + + Status Visit(const NullType& type) { return Status::OK(); } + Status Visit(const BooleanType& type) { return Status::OK(); } + Status Visit(const Int8Type& type) { return Status::OK(); } + Status Visit(const UInt8Type& type) { return Status::OK(); } + Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); } + Status Visit(const FixedSizeListType& type) { return Status::OK(); } + Status Visit(const StructType& type) { return Status::OK(); } + Status Visit(const UnionType& type) { + out_->buffers[1] = data_->buffers[1]; + if (type.mode() == UnionMode::DENSE) { + RETURN_NOT_OK(SwapOffsets(2)); + } + return Status::OK(); + } + + template + enable_if_t::value || std::is_same::value, + Status> + Visit(const T& type) { + RETURN_NOT_OK(SwapOffsets(1)); + out_->buffers[2] = data_->buffers[2]; + return Status::OK(); + } + + template + enable_if_t::value || + std::is_same::value, + Status> + Visit(const T& type) { + RETURN_NOT_OK(SwapOffsets(1)); + out_->buffers[2] = data_->buffers[2]; + return Status::OK(); + } + + Status Visit(const ListType& type) { + RETURN_NOT_OK(SwapOffsets(1)); + return Status::OK(); + } + Status Visit(const LargeListType& type) { + RETURN_NOT_OK(SwapOffsets(1)); + return Status::OK(); + } + + Status Visit(const DictionaryType& type) { + // dictionary was already swapped in ReadDictionary() in ipc/reader.cc + RETURN_NOT_OK(SwapType(*type.index_type())); + return Status::OK(); + } + + Status Visit(const ExtensionType& type) { + RETURN_NOT_OK(SwapType(*type.storage_type())); + return Status::OK(); + } + + const std::shared_ptr& data_; + int64_t length_; + std::shared_ptr out_; +}; + +} // namespace + +namespace internal { + +Result> SwapEndianArrayData( + const std::shared_ptr& data) { + if (data->offset != 0) { + return Status::Invalid("Unsupported data format: data.offset != 0"); + } + ArrayDataEndianSwapper swapper(data, data->length); + RETURN_NOT_OK(swapper.SwapType(*data->type)); + return std::move(swapper.out_); +} + } // namespace internal std::shared_ptr MakeArray(const std::shared_ptr& data) { std::shared_ptr out; - internal::ArrayDataWrapper wrapper_visitor(data, &out); + ArrayDataWrapper wrapper_visitor(data, &out); DCHECK_OK(VisitTypeInline(*data->type, &wrapper_visitor)); DCHECK(out); return out; diff --git a/cpp/src/arrow/array/util.h b/cpp/src/arrow/array/util.h index b400255c18e..3ef4e08828f 100644 --- a/cpp/src/arrow/array/util.h +++ b/cpp/src/arrow/array/util.h @@ -56,6 +56,17 @@ Result> MakeArrayFromScalar( namespace internal { +/// \brief Swap endian of each element in a generic ArrayData +/// +/// As dictionaries are often shared between different arrays, dictionaries +/// are not swapped by this function and should be handled separately. +/// +/// \param[in] data the array contents +/// \return the resulting ArrayData whose elements were swapped +ARROW_EXPORT +Result> SwapEndianArrayData( + const std::shared_ptr& data); + /// Given a number of ArrayVectors, treat each ArrayVector as the /// chunks of a chunked array. Then rechunk each ArrayVector such that /// all ArrayVectors are chunked identically. It is mandatory that diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index fc11f126e72..317fd01f17c 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -33,6 +33,7 @@ #include "arrow/memory_pool.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" +#include "arrow/util/endian.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" diff --git a/cpp/src/arrow/chunked_array_test.cc b/cpp/src/arrow/chunked_array_test.cc index 3144f5786d7..c5907549fe4 100644 --- a/cpp/src/arrow/chunked_array_test.cc +++ b/cpp/src/arrow/chunked_array_test.cc @@ -27,7 +27,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/type.h" -#include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/key_value_metadata.h" namespace arrow { diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 6569e71b454..906cb00ef07 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/options.h" #include "arrow/ipc/util.h" #include "arrow/status.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/ubsan.h" diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index f818aebab24..6a1f5ffe0eb 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1335,7 +1335,11 @@ Status GetSchema(const void* opaque_schema, DictionaryMemo* dictionary_memo, std::shared_ptr metadata; RETURN_NOT_OK(internal::GetKeyValueMetadata(schema->custom_metadata(), &metadata)); - *out = ::arrow::schema(std::move(fields), metadata); + // set endianess using the value in flatbuf schema + auto endianness = schema->endianness() == flatbuf::Endianness::Little + ? Endianness::Little + : Endianness::Big; + *out = ::arrow::schema(std::move(fields), endianness, metadata); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index aa939e24378..2e0f800b5ad 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -137,6 +137,18 @@ struct ARROW_EXPORT IpcReadOptions { /// like decompression bool use_threads = true; + /// \brief EXPERIMENTAL: Convert incoming data to platform-native endianness + /// + /// If the endianness of the received schema is not equal to platform-native + /// endianness, then all buffers with endian-sensitive data will be byte-swapped. + /// This includes the value buffers of numeric types, temporal types, decimal + /// types, as well as the offset buffers of variable-sized binary and list-like + /// types. + /// + /// Endianness conversion is achieved by the RecordBatchFileReader, + /// RecordBatchStreamReader and StreamDecoder classes. + bool ensure_native_endian = true; + static IpcReadOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 82fb4c743a4..7e39ee1c484 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -46,6 +46,7 @@ #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" +#include "arrow/util/endian.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/parallel.h" @@ -108,6 +109,30 @@ Status InvalidMessageType(MessageType expected, MessageType actual) { // ---------------------------------------------------------------------- // Record batch read path +/// \brief Structure to keep common arguments to be passed +struct IpcReadContext { + IpcReadContext(DictionaryMemo* memo, const IpcReadOptions& option, bool swap, + MetadataVersion version = MetadataVersion::V5, + Compression::type kind = Compression::UNCOMPRESSED) + : dictionary_memo(memo), + options(option), + metadata_version(version), + compression(kind), + swap_endian(swap) {} + + DictionaryMemo* dictionary_memo; + + const IpcReadOptions& options; + + MetadataVersion metadata_version; + + Compression::type compression; + + /// \brief LoadRecordBatch() or LoadRecordBatchSubset() swaps endianness of elements + /// if this flag is true + const bool swap_endian; +}; + /// The field_index and buffer_index are incremented based on how much of the /// batch is "consumed" (through nested data reconstruction, for example) class ArrayLoader { @@ -439,10 +464,9 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op Result> LoadRecordBatchSubset( const flatbuf::RecordBatch* metadata, const std::shared_ptr& schema, - const std::vector* inclusion_mask, const DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, MetadataVersion metadata_version, - Compression::type compression, io::RandomAccessFile* file) { - ArrayLoader loader(metadata, metadata_version, options, file); + const std::vector* inclusion_mask, const IpcReadContext& context, + io::RandomAccessFile* file) { + ArrayLoader loader(metadata, context.metadata_version, context.options, file); ArrayDataVector columns(schema->num_fields()); ArrayDataVector filtered_columns; @@ -472,7 +496,8 @@ Result> LoadRecordBatchSubset( // Dictionary resolution needs to happen on the unfiltered columns, // because fields are mapped structurally (by path in the original schema). - RETURN_NOT_OK(ResolveDictionaries(columns, *dictionary_memo, options.memory_pool)); + RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, + context.options.memory_pool)); if (inclusion_mask) { filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); @@ -481,25 +506,30 @@ Result> LoadRecordBatchSubset( filtered_schema = schema; filtered_columns = std::move(columns); } - if (compression != Compression::UNCOMPRESSED) { - RETURN_NOT_OK(DecompressBuffers(compression, options, &filtered_columns)); + if (context.compression != Compression::UNCOMPRESSED) { + RETURN_NOT_OK( + DecompressBuffers(context.compression, context.options, &filtered_columns)); } + // swap endian in a set of ArrayData if necessary (swap_endian == true) + if (context.swap_endian) { + for (int i = 0; i < static_cast(filtered_columns.size()); ++i) { + ARROW_ASSIGN_OR_RAISE(filtered_columns[i], + arrow::internal::SwapEndianArrayData(filtered_columns[i])); + } + } return RecordBatch::Make(filtered_schema, metadata->length(), std::move(filtered_columns)); } Result> LoadRecordBatch( const flatbuf::RecordBatch* metadata, const std::shared_ptr& schema, - const std::vector& inclusion_mask, const DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, MetadataVersion metadata_version, - Compression::type compression, io::RandomAccessFile* file) { + const std::vector& inclusion_mask, const IpcReadContext& context, + io::RandomAccessFile* file) { if (inclusion_mask.size() > 0) { - return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, dictionary_memo, - options, metadata_version, compression, file); + return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, context, file); } else { - return LoadRecordBatchSubset(metadata, schema, nullptr, dictionary_memo, options, - metadata_version, compression, file); + return LoadRecordBatchSubset(metadata, schema, /*param_name=*/nullptr, context, file); } } @@ -577,8 +607,8 @@ Result> ReadRecordBatch( Result> ReadRecordBatchInternal( const Buffer& metadata, const std::shared_ptr& schema, - const std::vector& inclusion_mask, const DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, io::RandomAccessFile* file) { + const std::vector& inclusion_mask, IpcReadContext& context, + io::RandomAccessFile* file) { const flatbuf::Message* message = nullptr; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message)); auto batch = message->header_as_RecordBatch(); @@ -589,15 +619,15 @@ Result> ReadRecordBatchInternal( Compression::type compression; RETURN_NOT_OK(GetCompression(batch, &compression)); - if (compression == Compression::UNCOMPRESSED && + if (context.compression == Compression::UNCOMPRESSED && message->version() == flatbuf::MetadataVersion::V4) { // Possibly obtain codec information from experimental serialization format // in 0.17.x RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); } - return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options, - internal::GetMetadataVersion(message->version()), compression, - file); + context.compression = compression; + context.metadata_version = internal::GetMetadataVersion(message->version()); + return LoadRecordBatch(batch, schema, inclusion_mask, context, file); } // If we are selecting only certain fields, populate an inclusion mask for fast lookups. @@ -630,7 +660,8 @@ Status GetInclusionMaskAndOutSchema(const std::shared_ptr& full_schema, included_fields.push_back(full_schema->field(i)); } - *out_schema = schema(std::move(included_fields), full_schema->metadata()); + *out_schema = schema(std::move(included_fields), full_schema->endianness(), + full_schema->metadata()); return Status::OK(); } @@ -638,25 +669,32 @@ Status UnpackSchemaMessage(const void* opaque_schema, const IpcReadOptions& opti DictionaryMemo* dictionary_memo, std::shared_ptr* schema, std::shared_ptr* out_schema, - std::vector* field_inclusion_mask) { + std::vector* field_inclusion_mask, bool* swap_endian) { RETURN_NOT_OK(internal::GetSchema(opaque_schema, dictionary_memo, schema)); // If we are selecting only certain fields, populate the inclusion mask now // for fast lookups - return GetInclusionMaskAndOutSchema(*schema, options.included_fields, - field_inclusion_mask, out_schema); + RETURN_NOT_OK(GetInclusionMaskAndOutSchema(*schema, options.included_fields, + field_inclusion_mask, out_schema)); + *swap_endian = options.ensure_native_endian && !out_schema->get()->is_native_endian(); + if (*swap_endian) { + // create a new schema with native endianness before swapping endian in ArrayData + *schema = schema->get()->WithEndianness(Endianness::Native); + *out_schema = out_schema->get()->WithEndianness(Endianness::Native); + } + return Status::OK(); } Status UnpackSchemaMessage(const Message& message, const IpcReadOptions& options, DictionaryMemo* dictionary_memo, std::shared_ptr* schema, std::shared_ptr* out_schema, - std::vector* field_inclusion_mask) { + std::vector* field_inclusion_mask, bool* swap_endian) { CHECK_MESSAGE_TYPE(MessageType::SCHEMA, message.type()); CHECK_HAS_NO_BODY(message); return UnpackSchemaMessage(message.header(), options, dictionary_memo, schema, - out_schema, field_inclusion_mask); + out_schema, field_inclusion_mask, swap_endian); } Result> ReadRecordBatch( @@ -666,15 +704,14 @@ Result> ReadRecordBatch( std::shared_ptr out_schema; // Empty means do not use std::vector inclusion_mask; - RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, options.included_fields, + IpcReadContext context(const_cast(dictionary_memo), options, false); + RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields, &inclusion_mask, &out_schema)); - return ReadRecordBatchInternal(metadata, schema, inclusion_mask, dictionary_memo, - options, file); + return ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file); } -Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, DictionaryKind* kind, - io::RandomAccessFile* file) { +Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context, + DictionaryKind* kind, io::RandomAccessFile* file) { const flatbuf::Message* message = nullptr; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message)); const auto dictionary_batch = message->header_as_DictionaryBatch(); @@ -701,42 +738,46 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, // Look up the dictionary value type, which must have been added to the // DictionaryMemo already prior to invoking this function - ARROW_ASSIGN_OR_RAISE(auto value_type, dictionary_memo->GetDictionaryType(id)); + ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id)); // Load the dictionary data from the dictionary batch ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()), - options, file); - const auto dict_data = std::make_shared(); + context.options, file); + auto dict_data = std::make_shared(); const Field dummy_field("", value_type); RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get())); if (compression != Compression::UNCOMPRESSED) { ArrayDataVector dict_fields{dict_data}; - RETURN_NOT_OK(DecompressBuffers(compression, options, &dict_fields)); + RETURN_NOT_OK(DecompressBuffers(compression, context.options, &dict_fields)); + } + + // swap endian in dict_data if necessary (swap_endian == true) + if (context.swap_endian) { + ARROW_ASSIGN_OR_RAISE(dict_data, ::arrow::internal::SwapEndianArrayData(dict_data)); } if (dictionary_batch->isDelta()) { if (kind != nullptr) { *kind = DictionaryKind::Delta; } - return dictionary_memo->AddDictionaryDelta(id, dict_data); + return context.dictionary_memo->AddDictionaryDelta(id, dict_data); } ARROW_ASSIGN_OR_RAISE(bool inserted, - dictionary_memo->AddOrReplaceDictionary(id, dict_data)); + context.dictionary_memo->AddOrReplaceDictionary(id, dict_data)); if (kind != nullptr) { *kind = inserted ? DictionaryKind::New : DictionaryKind::Replacement; } return Status::OK(); } -Status ReadDictionary(const Message& message, DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, DictionaryKind* kind) { +Status ReadDictionary(const Message& message, const IpcReadContext& context, + DictionaryKind* kind) { // Only invoke this method if we already know we have a dictionary message DCHECK_EQ(message.type(), MessageType::DICTIONARY_BATCH); CHECK_HAS_BODY(message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body())); - return ReadDictionary(*message.metadata(), dictionary_memo, options, kind, - reader.get()); + return ReadDictionary(*message.metadata(), context, kind, reader.get()); } // ---------------------------------------------------------------------- @@ -755,8 +796,10 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { return Status::Invalid("Tried reading schema message, was null or length 0"); } - return UnpackSchemaMessage(*message, options, &dictionary_memo_, &schema_, - &out_schema_, &field_inclusion_mask_); + RETURN_NOT_OK(UnpackSchemaMessage(*message, options, &dictionary_memo_, &schema_, + &out_schema_, &field_inclusion_mask_, + &swap_endian_)); + return Status::OK(); } Status ReadNext(std::shared_ptr* batch) override { @@ -788,8 +831,9 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); return ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, - &dictionary_memo_, options_, reader.get()) + context, reader.get()) .Value(batch); } @@ -819,8 +863,8 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { // Read dictionary from dictionary batch Status ReadDictionary(const Message& message) { DictionaryKind kind; - RETURN_NOT_OK( - ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, &kind)); + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); + RETURN_NOT_OK(::arrow::ipc::ReadDictionary(message, context, &kind)); switch (kind) { case DictionaryKind::New: break; @@ -886,6 +930,8 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { DictionaryMemo dictionary_memo_; std::shared_ptr schema_, out_schema_; + + bool swap_endian_; }; // ---------------------------------------------------------------------- @@ -941,10 +987,10 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); - ARROW_ASSIGN_OR_RAISE( - auto batch, - ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, - &dictionary_memo_, options_, reader.get())); + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); + ARROW_ASSIGN_OR_RAISE(auto batch, ReadRecordBatchInternal( + *message->metadata(), schema_, + field_inclusion_mask_, context, reader.get())); ++stats_.num_record_batches; return batch; } @@ -964,7 +1010,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { // Get the schema and record any observed dictionaries RETURN_NOT_OK(UnpackSchemaMessage(footer_->schema(), options, &dictionary_memo_, - &schema_, &out_schema_, &field_inclusion_mask_)); + &schema_, &out_schema_, &field_inclusion_mask_, + &swap_endian_)); ++stats_.num_messages; return Status::OK(); } @@ -1008,8 +1055,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); DictionaryKind kind; - RETURN_NOT_OK(ReadDictionary(*message->metadata(), &dictionary_memo_, options_, - &kind, reader.get())); + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); + RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, reader.get())); ++stats_.num_dictionary_batches; if (kind != DictionaryKind::New) { return Status::Invalid( @@ -1097,6 +1144,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { std::shared_ptr out_schema_; ReadStats stats_; + + bool swap_endian_; }; Result> RecordBatchFileReader::Open( @@ -1192,7 +1241,8 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { private: Status OnSchemaMessageDecoded(std::unique_ptr message) { RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, &schema_, - &out_schema_, &field_inclusion_mask_)); + &out_schema_, &field_inclusion_mask_, + &swap_endian_)); n_required_dictionaries_ = dictionary_memo_.fields().num_fields(); if (n_required_dictionaries_ == 0) { @@ -1220,15 +1270,17 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { } Status OnRecordBatchMessageDecoded(std::unique_ptr message) { + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); if (message->type() == MessageType::DICTIONARY_BATCH) { return ReadDictionary(*message); } else { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); ARROW_ASSIGN_OR_RAISE( auto batch, ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, - &dictionary_memo_, options_, reader.get())); + context, reader.get())); ++stats_.num_record_batches; return listener_->OnRecordBatchDecoded(std::move(batch)); } @@ -1237,8 +1289,8 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { // Read dictionary from dictionary batch Status ReadDictionary(const Message& message) { DictionaryKind kind; - RETURN_NOT_OK( - ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, &kind)); + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); + RETURN_NOT_OK(::arrow::ipc::ReadDictionary(message, context, &kind)); ++stats_.num_dictionary_batches; switch (kind) { case DictionaryKind::New: @@ -1262,6 +1314,7 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { DictionaryMemo dictionary_memo_; std::shared_ptr schema_, out_schema_; ReadStats stats_; + bool swap_endian_; }; StreamDecoder::StreamDecoder(std::shared_ptr listener, IpcReadOptions options) { diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index ac866daa8d2..c14ff5ec9bc 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -49,6 +49,7 @@ #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" +#include "arrow/util/endian.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 12d3951865f..9192c325bbf 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -1298,29 +1298,56 @@ void PrintTo(const FieldRef& ref, std::ostream* os) { *os << ref.ToString(); } // ---------------------------------------------------------------------- // Schema implementation +std::string EndiannessToString(Endianness endianness) { + switch (endianness) { + case Endianness::Little: + return "little"; + case Endianness::Big: + return "big"; + default: + DCHECK(false) << "invalid endianness"; + return "???"; + } +} + class Schema::Impl { public: - Impl(std::vector> fields, + Impl(std::vector> fields, Endianness endianness, std::shared_ptr metadata) : fields_(std::move(fields)), + endianness_(endianness), name_to_index_(CreateNameToIndexMap(fields_)), metadata_(std::move(metadata)) {} std::vector> fields_; + Endianness endianness_; std::unordered_multimap name_to_index_; std::shared_ptr metadata_; }; +Schema::Schema(std::vector> fields, Endianness endianness, + std::shared_ptr metadata) + : detail::Fingerprintable(), + impl_(new Impl(std::move(fields), endianness, std::move(metadata))) {} + Schema::Schema(std::vector> fields, std::shared_ptr metadata) : detail::Fingerprintable(), - impl_(new Impl(std::move(fields), std::move(metadata))) {} + impl_(new Impl(std::move(fields), Endianness::Native, std::move(metadata))) {} Schema::Schema(const Schema& schema) : detail::Fingerprintable(), impl_(new Impl(*schema.impl_)) {} Schema::~Schema() = default; +std::shared_ptr Schema::WithEndianness(Endianness endianness) const { + return std::make_shared(impl_->fields_, endianness, impl_->metadata_); +} + +Endianness Schema::endianness() const { return impl_->endianness_; } + +bool Schema::is_native_endian() const { return impl_->endianness_ == Endianness::Native; } + int Schema::num_fields() const { return static_cast(impl_->fields_.size()); } const std::shared_ptr& Schema::field(int i) const { @@ -1338,6 +1365,11 @@ bool Schema::Equals(const Schema& other, bool check_metadata) const { return true; } + // checks endianness equality + if (endianness() != other.endianness()) { + return false; + } + // checks field equality if (num_fields() != other.num_fields()) { return false; @@ -1482,6 +1514,10 @@ std::string Schema::ToString(bool show_metadata) const { ++i; } + if (impl_->endianness_ != Endianness::Native) { + buffer << "\n-- endianness: " << EndiannessToString(impl_->endianness_) << " --"; + } + if (show_metadata && HasMetadata()) { buffer << impl_->metadata_->ToString(); } @@ -1661,6 +1697,12 @@ std::shared_ptr schema(std::vector> fields, return std::make_shared(std::move(fields), std::move(metadata)); } +std::shared_ptr schema(std::vector> fields, + Endianness endianness, + std::shared_ptr metadata) { + return std::make_shared(std::move(fields), endianness, std::move(metadata)); +} + Result> UnifySchemas( const std::vector>& schemas, const Field::MergeOptions field_merge_options) { @@ -1819,6 +1861,7 @@ std::string Schema::ComputeFingerprint() const { } ss << field_fingerprint << ";"; } + ss << (endianness() == Endianness::Little ? "L" : "B"); ss << "}"; return ss.str(); } diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 56718b7c512..0672354ab6c 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -30,6 +30,7 @@ #include "arrow/result.h" #include "arrow/type_fwd.h" // IWYU pragma: export #include "arrow/util/checked_cast.h" +#include "arrow/util/endian.h" #include "arrow/util/macros.h" #include "arrow/util/variant.h" #include "arrow/util/visibility.h" @@ -1604,6 +1605,16 @@ class ARROW_EXPORT FieldRef { // ---------------------------------------------------------------------- // Schema +enum class Endianness { + Little = 0, + Big = 1, +#if ARROW_LITTLE_ENDIAN + Native = Little +#else + Native = Big +#endif +}; + /// \class Schema /// \brief Sequence of arrow::Field objects describing the columns of a record /// batch or table data structure @@ -1611,6 +1622,9 @@ class ARROW_EXPORT Schema : public detail::Fingerprintable, public util::EqualityComparable, public util::ToStringOstreamable { public: + explicit Schema(std::vector> fields, Endianness endianness, + std::shared_ptr metadata = NULLPTR); + explicit Schema(std::vector> fields, std::shared_ptr metadata = NULLPTR); @@ -1622,6 +1636,17 @@ class ARROW_EXPORT Schema : public detail::Fingerprintable, bool Equals(const Schema& other, bool check_metadata = false) const; bool Equals(const std::shared_ptr& other, bool check_metadata = false) const; + /// \brief Set endianness in the schema + /// + /// \return new Schema + std::shared_ptr WithEndianness(Endianness endianness) const; + + /// \brief Return endianness in the schema + Endianness endianness() const; + + /// \brief Indicate if endianness is equal to platform-native endianness + bool is_native_endian() const; + /// \brief Return the number of fields (columns) in the schema int num_fields() const; @@ -1690,6 +1715,9 @@ class ARROW_EXPORT Schema : public detail::Fingerprintable, std::unique_ptr impl_; }; +ARROW_EXPORT +std::string EndiannessToString(Endianness endianness); + // ---------------------------------------------------------------------- /// \brief Convenience class to incrementally construct/merge schemas. diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index f1000d1fe7f..14329675c8f 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -52,6 +52,7 @@ class DataType; class Field; class FieldRef; class KeyValueMetadata; +enum class Endianness; class Schema; using DataTypeVector = std::vector>; @@ -635,6 +636,17 @@ std::shared_ptr schema( std::vector> fields, std::shared_ptr metadata = NULLPTR); +/// \brief Create a Schema instance +/// +/// \param fields the schema's fields +/// \param endianness the endianness of the data +/// \param metadata any custom key-value metadata, default null +/// \return schema shared_ptr to Schema +ARROW_EXPORT +std::shared_ptr schema( + std::vector> fields, Endianness endianness, + std::shared_ptr metadata = NULLPTR); + /// @} /// Return the process-wide default memory pool. diff --git a/cpp/src/arrow/type_test.cc b/cpp/src/arrow/type_test.cc index 81a0315d6d1..da93e32936c 100644 --- a/cpp/src/arrow/type_test.cc +++ b/cpp/src/arrow/type_test.cc @@ -475,6 +475,31 @@ TEST_F(TestSchema, Basics) { ASSERT_EQ(schema->fingerprint(), schema2->fingerprint()); ASSERT_NE(schema->fingerprint(), schema3->fingerprint()); + + auto schema4 = ::arrow::schema({f0}, Endianness::Little); + auto schema5 = ::arrow::schema({f0}, Endianness::Little); + auto schema6 = ::arrow::schema({f0}, Endianness::Big); + auto schema7 = ::arrow::schema({f0}); + + AssertSchemaEqual(schema4, schema5); + AssertSchemaNotEqual(schema4, schema6); +#if ARROW_LITTLE_ENDIAN + AssertSchemaEqual(schema4, schema7); + AssertSchemaNotEqual(schema6, schema7); +#else + AssertSchemaNotEqual(schema4, schema6); + AssertSchemaEqual(schema6, schema7); +#endif + + ASSERT_EQ(schema4->fingerprint(), schema5->fingerprint()); + ASSERT_NE(schema4->fingerprint(), schema6->fingerprint()); +#if ARROW_LITTLE_ENDIAN + ASSERT_EQ(schema4->fingerprint(), schema7->fingerprint()); + ASSERT_NE(schema6->fingerprint(), schema7->fingerprint()); +#else + ASSERT_NE(schema4->fingerprint(), schema7->fingerprint()); + ASSERT_EQ(schema6->fingerprint(), schema7->fingerprint()); +#endif } TEST_F(TestSchema, ToString) { @@ -495,14 +520,38 @@ f3: list)"; ASSERT_EQ(expected, result); result = schema->ToString(/*print_metadata=*/true); + std::string expected_with_metadata = expected + R"( +-- metadata -- +foo: bar)"; + + ASSERT_EQ(expected_with_metadata, result); + + // With swapped endianness +#if ARROW_LITTLE_ENDIAN + schema = schema->WithEndianness(Endianness::Big); + expected = R"(f0: int32 +f1: uint8 not null +f2: string +f3: list +-- endianness: big --)"; +#else + schema = schema->WithEndianness(Endianness::Little); expected = R"(f0: int32 f1: uint8 not null f2: string f3: list +-- endianness: little --)"; +#endif + + result = schema->ToString(); + ASSERT_EQ(expected, result); + + result = schema->ToString(/*print_metadata=*/true); + expected_with_metadata = expected + R"( -- metadata -- foo: bar)"; - ASSERT_EQ(expected, result); + ASSERT_EQ(expected_with_metadata, result); } TEST_F(TestSchema, GetFieldByName) { diff --git a/cpp/src/arrow/util/basic_decimal.cc b/cpp/src/arrow/util/basic_decimal.cc index 78d5b15d1c0..d9d6f4f42fa 100644 --- a/cpp/src/arrow/util/basic_decimal.cc +++ b/cpp/src/arrow/util/basic_decimal.cc @@ -28,6 +28,7 @@ #include #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/int128_internal.h" #include "arrow/util/int_util_internal.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/util/bit_block_counter.h b/cpp/src/arrow/util/bit_block_counter.h index 0b6199cf15e..803b825e1b2 100644 --- a/cpp/src/arrow/util/bit_block_counter.h +++ b/cpp/src/arrow/util/bit_block_counter.h @@ -25,6 +25,7 @@ #include "arrow/buffer.h" #include "arrow/status.h" #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/macros.h" #include "arrow/util/ubsan.h" #include "arrow/util/visibility.h" diff --git a/cpp/src/arrow/util/bit_run_reader.h b/cpp/src/arrow/util/bit_run_reader.h index 39ff049428d..b24632a6e5e 100644 --- a/cpp/src/arrow/util/bit_run_reader.h +++ b/cpp/src/arrow/util/bit_run_reader.h @@ -24,6 +24,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_reader.h" +#include "arrow/util/endian.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h index 74f7e61e9cc..01845791faa 100644 --- a/cpp/src/arrow/util/bit_util.h +++ b/cpp/src/arrow/util/bit_util.h @@ -17,42 +17,14 @@ #pragma once -#ifdef _WIN32 -#define ARROW_LITTLE_ENDIAN 1 -#else -#if defined(__APPLE__) || defined(__FreeBSD__) -#include // IWYU pragma: keep -#else -#include // IWYU pragma: keep -#endif -# -#ifndef __BYTE_ORDER__ -#error "__BYTE_ORDER__ not defined" -#endif -# -#ifndef __ORDER_LITTLE_ENDIAN__ -#error "__ORDER_LITTLE_ENDIAN__ not defined" -#endif -# -#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ -#define ARROW_LITTLE_ENDIAN 1 -#else -#define ARROW_LITTLE_ENDIAN 0 -#endif -#endif - #if defined(_MSC_VER) #include // IWYU pragma: keep #include #pragma intrinsic(_BitScanReverse) #pragma intrinsic(_BitScanForward) -#define ARROW_BYTE_SWAP64 _byteswap_uint64 -#define ARROW_BYTE_SWAP32 _byteswap_ulong #define ARROW_POPCOUNT64 __popcnt64 #define ARROW_POPCOUNT32 __popcnt #else -#define ARROW_BYTE_SWAP64 __builtin_bswap64 -#define ARROW_BYTE_SWAP32 __builtin_bswap32 #define ARROW_POPCOUNT64 __builtin_popcountll #define ARROW_POPCOUNT32 __builtin_popcount #endif @@ -61,7 +33,6 @@ #include #include "arrow/util/macros.h" -#include "arrow/util/type_traits.h" #include "arrow/util/visibility.h" namespace arrow { @@ -301,116 +272,6 @@ static inline int Log2(uint64_t x) { return NumRequiredBits(x - 1); } -// -// Byte-swap 16-bit, 32-bit and 64-bit values -// - -// Swap the byte order (i.e. endianness) -static inline int64_t ByteSwap(int64_t value) { return ARROW_BYTE_SWAP64(value); } -static inline uint64_t ByteSwap(uint64_t value) { - return static_cast(ARROW_BYTE_SWAP64(value)); -} -static inline int32_t ByteSwap(int32_t value) { return ARROW_BYTE_SWAP32(value); } -static inline uint32_t ByteSwap(uint32_t value) { - return static_cast(ARROW_BYTE_SWAP32(value)); -} -static inline int16_t ByteSwap(int16_t value) { - constexpr auto m = static_cast(0xff); - return static_cast(((value >> 8) & m) | ((value & m) << 8)); -} -static inline uint16_t ByteSwap(uint16_t value) { - return static_cast(ByteSwap(static_cast(value))); -} -static inline uint8_t ByteSwap(uint8_t value) { return value; } - -// Write the swapped bytes into dst. Src and dst cannot overlap. -static inline void ByteSwap(void* dst, const void* src, int len) { - switch (len) { - case 1: - *reinterpret_cast(dst) = *reinterpret_cast(src); - return; - case 2: - *reinterpret_cast(dst) = ByteSwap(*reinterpret_cast(src)); - return; - case 4: - *reinterpret_cast(dst) = ByteSwap(*reinterpret_cast(src)); - return; - case 8: - *reinterpret_cast(dst) = ByteSwap(*reinterpret_cast(src)); - return; - default: - break; - } - - auto d = reinterpret_cast(dst); - auto s = reinterpret_cast(src); - for (int i = 0; i < len; ++i) { - d[i] = s[len - i - 1]; - } -} - -// Convert to little/big endian format from the machine's native endian format. -#if ARROW_LITTLE_ENDIAN -template > -static inline T ToBigEndian(T value) { - return ByteSwap(value); -} - -template > -static inline T ToLittleEndian(T value) { - return value; -} -#else -template > -static inline T ToBigEndian(T value) { - return value; -} - -template > -static inline T ToLittleEndian(T value) { - return ByteSwap(value); -} -#endif - -// Convert from big/little endian format to the machine's native endian format. -#if ARROW_LITTLE_ENDIAN -template > -static inline T FromBigEndian(T value) { - return ByteSwap(value); -} - -template > -static inline T FromLittleEndian(T value) { - return value; -} -#else -template > -static inline T FromBigEndian(T value) { - return value; -} - -template > -static inline T FromLittleEndian(T value) { - return ByteSwap(value); -} -#endif - // // Utilities for reading and writing individual bits by their index // in a memory area. diff --git a/cpp/src/arrow/util/bit_util_test.cc b/cpp/src/arrow/util/bit_util_test.cc index c71abde9409..e5a5e4c39be 100644 --- a/cpp/src/arrow/util/bit_util_test.cc +++ b/cpp/src/arrow/util/bit_util_test.cc @@ -44,7 +44,6 @@ #include "arrow/type_fwd.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_stream_utils.h" -#include "arrow/util/bit_util.h" #include "arrow/util/bitmap.h" #include "arrow/util/bitmap_generate.h" #include "arrow/util/bitmap_ops.h" @@ -52,6 +51,7 @@ #include "arrow/util/bitmap_visit.h" #include "arrow/util/bitmap_writer.h" #include "arrow/util/bitset_stack.h" +#include "arrow/util/endian.h" namespace arrow { @@ -1786,6 +1786,20 @@ TEST(BitUtil, ByteSwap) { EXPECT_EQ(BitUtil::ByteSwap(static_cast(0)), 0); EXPECT_EQ(BitUtil::ByteSwap(static_cast(0x1122)), 0x2211); + + EXPECT_EQ(BitUtil::ByteSwap(static_cast(0)), 0); + EXPECT_EQ(BitUtil::ByteSwap(static_cast(0x11)), 0x11); + + EXPECT_EQ(BitUtil::ByteSwap(static_cast(0)), 0); + EXPECT_EQ(BitUtil::ByteSwap(static_cast(0x11)), 0x11); + + EXPECT_EQ(BitUtil::ByteSwap(static_cast(0)), 0); + uint32_t srci32 = 0xaabbccdd, expectedi32 = 0xddccbbaa; + EXPECT_EQ(BitUtil::ByteSwap(*reinterpret_cast(&srci32)), + *reinterpret_cast(&expectedi32)); + uint64_t srci64 = 0xaabb11223344ccdd, expectedi64 = 0xddcc44332211bbaa; + EXPECT_EQ(BitUtil::ByteSwap(*reinterpret_cast(&srci64)), + *reinterpret_cast(&expectedi64)); } TEST(BitUtil, Log2) { diff --git a/cpp/src/arrow/util/bitmap.h b/cpp/src/arrow/util/bitmap.h index c26d75f0b53..8562c55e3d5 100644 --- a/cpp/src/arrow/util/bitmap.h +++ b/cpp/src/arrow/util/bitmap.h @@ -30,6 +30,7 @@ #include "arrow/buffer.h" #include "arrow/util/bit_util.h" #include "arrow/util/compare.h" +#include "arrow/util/endian.h" #include "arrow/util/functional.h" #include "arrow/util/string_builder.h" #include "arrow/util/string_view.h" diff --git a/cpp/src/arrow/util/bitmap_ops.cc b/cpp/src/arrow/util/bitmap_ops.cc index 9f1c63653d6..1f9cf19bbd0 100644 --- a/cpp/src/arrow/util/bitmap_ops.cc +++ b/cpp/src/arrow/util/bitmap_ops.cc @@ -28,6 +28,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_reader.h" #include "arrow/util/bitmap_writer.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/ubsan.h" diff --git a/cpp/src/arrow/util/bitmap_reader.h b/cpp/src/arrow/util/bitmap_reader.h index e1412ac8d70..cf4f5e7db8b 100644 --- a/cpp/src/arrow/util/bitmap_reader.h +++ b/cpp/src/arrow/util/bitmap_reader.h @@ -22,6 +22,7 @@ #include "arrow/buffer.h" #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/macros.h" namespace arrow { diff --git a/cpp/src/arrow/util/bitmap_writer.h b/cpp/src/arrow/util/bitmap_writer.h index 7cb2fc6a98f..d4f02f37a41 100644 --- a/cpp/src/arrow/util/bitmap_writer.h +++ b/cpp/src/arrow/util/bitmap_writer.h @@ -21,6 +21,7 @@ #include #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/macros.h" namespace arrow { diff --git a/cpp/src/arrow/util/bpacking.h b/cpp/src/arrow/util/bpacking.h index 71714c4c7d8..e5a4dbbed89 100644 --- a/cpp/src/arrow/util/bpacking.h +++ b/cpp/src/arrow/util/bpacking.h @@ -17,6 +17,7 @@ #pragma once +#include "arrow/util/endian.h" #include "arrow/util/visibility.h" #include diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index bb0295e6858..9314dfd7faf 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -27,6 +27,7 @@ #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/ubsan.h" diff --git a/cpp/src/arrow/util/decimal.cc b/cpp/src/arrow/util/decimal.cc index dcb2023616a..c683e198cd6 100644 --- a/cpp/src/arrow/util/decimal.cc +++ b/cpp/src/arrow/util/decimal.cc @@ -29,8 +29,8 @@ #include #include "arrow/status.h" -#include "arrow/util/bit_util.h" #include "arrow/util/decimal.h" +#include "arrow/util/endian.h" #include "arrow/util/formatting.h" #include "arrow/util/int128_internal.h" #include "arrow/util/int_util_internal.h" diff --git a/cpp/src/arrow/util/decimal_test.cc b/cpp/src/arrow/util/decimal_test.cc index 40ae49da2ce..0bc838d0c29 100644 --- a/cpp/src/arrow/util/decimal_test.cc +++ b/cpp/src/arrow/util/decimal_test.cc @@ -32,6 +32,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/util/decimal.h" +#include "arrow/util/endian.h" #include "arrow/util/int128_internal.h" #include "arrow/util/macros.h" diff --git a/cpp/src/arrow/util/endian.h b/cpp/src/arrow/util/endian.h new file mode 100644 index 00000000000..81577e9091f --- /dev/null +++ b/cpp/src/arrow/util/endian.h @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#ifdef _WIN32 +#define ARROW_LITTLE_ENDIAN 1 +#else +#if defined(__APPLE__) || defined(__FreeBSD__) +#include // IWYU pragma: keep +#else +#include // IWYU pragma: keep +#endif +# +#ifndef __BYTE_ORDER__ +#error "__BYTE_ORDER__ not defined" +#endif +# +#ifndef __ORDER_LITTLE_ENDIAN__ +#error "__ORDER_LITTLE_ENDIAN__ not defined" +#endif +# +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ +#define ARROW_LITTLE_ENDIAN 1 +#else +#define ARROW_LITTLE_ENDIAN 0 +#endif +#endif + +#if defined(_MSC_VER) +#include // IWYU pragma: keep +#define ARROW_BYTE_SWAP64 _byteswap_uint64 +#define ARROW_BYTE_SWAP32 _byteswap_ulong +#else +#define ARROW_BYTE_SWAP64 __builtin_bswap64 +#define ARROW_BYTE_SWAP32 __builtin_bswap32 +#endif + +#include "arrow/util/type_traits.h" +#include "arrow/util/ubsan.h" + +namespace arrow { +namespace BitUtil { + +// +// Byte-swap 16-bit, 32-bit and 64-bit values +// + +// Swap the byte order (i.e. endianness) +static inline int64_t ByteSwap(int64_t value) { return ARROW_BYTE_SWAP64(value); } +static inline uint64_t ByteSwap(uint64_t value) { + return static_cast(ARROW_BYTE_SWAP64(value)); +} +static inline int32_t ByteSwap(int32_t value) { return ARROW_BYTE_SWAP32(value); } +static inline uint32_t ByteSwap(uint32_t value) { + return static_cast(ARROW_BYTE_SWAP32(value)); +} +static inline int16_t ByteSwap(int16_t value) { + constexpr auto m = static_cast(0xff); + return static_cast(((value >> 8) & m) | ((value & m) << 8)); +} +static inline uint16_t ByteSwap(uint16_t value) { + return static_cast(ByteSwap(static_cast(value))); +} +static inline uint8_t ByteSwap(uint8_t value) { return value; } +static inline int8_t ByteSwap(int8_t value) { return value; } +static inline double ByteSwap(double value) { + const uint64_t swapped = ARROW_BYTE_SWAP64(util::SafeCopy(value)); + return util::SafeCopy(swapped); +} +static inline float ByteSwap(float value) { + const uint32_t swapped = ARROW_BYTE_SWAP32(util::SafeCopy(value)); + return util::SafeCopy(swapped); +} + +// Write the swapped bytes into dst. Src and dst cannot overlap. +static inline void ByteSwap(void* dst, const void* src, int len) { + switch (len) { + case 1: + *reinterpret_cast(dst) = *reinterpret_cast(src); + return; + case 2: + *reinterpret_cast(dst) = ByteSwap(*reinterpret_cast(src)); + return; + case 4: + *reinterpret_cast(dst) = ByteSwap(*reinterpret_cast(src)); + return; + case 8: + *reinterpret_cast(dst) = ByteSwap(*reinterpret_cast(src)); + return; + default: + break; + } + + auto d = reinterpret_cast(dst); + auto s = reinterpret_cast(src); + for (int i = 0; i < len; ++i) { + d[i] = s[len - i - 1]; + } +} + +// Convert to little/big endian format from the machine's native endian format. +#if ARROW_LITTLE_ENDIAN +template > +static inline T ToBigEndian(T value) { + return ByteSwap(value); +} + +template > +static inline T ToLittleEndian(T value) { + return value; +} +#else +template > +static inline T ToBigEndian(T value) { + return value; +} + +template > +static inline T ToLittleEndian(T value) { + return ByteSwap(value); +} +#endif + +// Convert from big/little endian format to the machine's native endian format. +#if ARROW_LITTLE_ENDIAN +template > +static inline T FromBigEndian(T value) { + return ByteSwap(value); +} + +template > +static inline T FromLittleEndian(T value) { + return value; +} +#else +template > +static inline T FromBigEndian(T value) { + return value; +} + +template > +static inline T FromLittleEndian(T value) { + return ByteSwap(value); +} +#endif + +} // namespace BitUtil +} // namespace arrow diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 28c273fea99..f55ac88fb91 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -39,6 +39,7 @@ #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_builders.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/ubsan.h" diff --git a/cpp/src/gandiva/selection_vector.cc b/cpp/src/gandiva/selection_vector.cc index 47e45d3359b..a30bba6864e 100644 --- a/cpp/src/gandiva/selection_vector.cc +++ b/cpp/src/gandiva/selection_vector.cc @@ -23,6 +23,7 @@ #include #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "gandiva/selection_vector_impl.h" diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index d7cbfdf1f9e..360078f254c 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -40,6 +40,7 @@ #include "arrow/util/base64.h" #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/endian.h" #include "arrow/util/int_util_internal.h" #include "arrow/util/logging.h" #include "arrow/util/string_view.h" diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 3ca5a80f675..48219ce2f7d 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -38,6 +38,7 @@ #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding.h" #include "arrow/visitor_inline.h" diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index e9fce9de838..02e81becd47 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -33,6 +33,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_writer.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/endian.h" #include "parquet/encoding.h" #include "parquet/platform.h" diff --git a/cpp/src/parquet/level_comparison_inc.h b/cpp/src/parquet/level_comparison_inc.h index f4cf7ab48e7..e21c3e5824d 100644 --- a/cpp/src/parquet/level_comparison_inc.h +++ b/cpp/src/parquet/level_comparison_inc.h @@ -17,6 +17,7 @@ #pragma once #include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "parquet/level_comparison.h" // Used to make sure ODR rule isn't violated. diff --git a/cpp/src/parquet/level_conversion.h b/cpp/src/parquet/level_conversion.h index d4d68457a13..e45a288e8c0 100644 --- a/cpp/src/parquet/level_conversion.h +++ b/cpp/src/parquet/level_conversion.h @@ -19,6 +19,7 @@ #include +#include "arrow/util/endian.h" #include "parquet/platform.h" #include "parquet/schema.h" diff --git a/cpp/src/parquet/types_test.cc b/cpp/src/parquet/types_test.cc index 13c3ffab729..e0ca7d63566 100644 --- a/cpp/src/parquet/types_test.cc +++ b/cpp/src/parquet/types_test.cc @@ -19,7 +19,7 @@ #include -#include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "parquet/types.h" namespace parquet { diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index f119f8f81a9..002f4e9991f 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -22,7 +22,7 @@ #include #include "arrow/status.h" -#include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "plasma/common.h" diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 2fe23a3b200..95394cdd37d 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -128,6 +128,11 @@ def _gold_tests(self, gold_dir): skip = set() if name == 'union' and prefix == '0.17.1': skip.add("Java") + if prefix == '1.0.0-bigendian' or prefix == '1.0.0-littleendian': + skip.add("Go") + skip.add("Java") + skip.add("JS") + skip.add("Rust") if prefix == '2.0.0-compression': skip.add("Go") skip.add("Java") diff --git a/dev/archery/generate_files_for_endian_test.sh b/dev/archery/generate_files_for_endian_test.sh new file mode 100755 index 00000000000..54019ea570e --- /dev/null +++ b/dev/archery/generate_files_for_endian_test.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This script generates json and arrow files of each type (e.g. primitive) for integration endian test +# Usage: generate_files_for_endian_test.sh +# ARROW_CPP_EXE_PATH : where Arrow C++ binaries can be found +# TMP_DIR : where files will be generated + +set -e + +: ${ARROW_CPP_EXE_PATH:=/arrow/cpp/build/debug/} +: ${TMP_DIR:=/tmp/arrow} + +json_dir=$TMP_DIR/arrow.$$ +mkdir -p $json_dir + +archery integration --stop-on-error --with-cpp=1 --tempdir=$json_dir + +for f in $json_dir/*.json ; do + $ARROW_CPP_EXE_PATH/arrow-json-integration-test -mode JSON_TO_ARROW -json $f -arrow ${f%.*}.arrow_file -integration true ; +done +for f in $json_dir/*.arrow_file ; do + $ARROW_CPP_EXE_PATH/arrow-file-to-stream $f > ${f%.*}.stream; +done +for f in $json_dir/*.json ; do + gzip $f ; +done +echo "The files are under $json_dir" diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 4101c36ef8f..bb96dce7993 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -120,7 +120,7 @@ to numeric type which can accommodate any value from either input. .. _common-numeric-type: Common numeric type -~~~~~~~~~~~~~~~~~~~ +------------------- The common numeric type of a set of input numeric types is the smallest numeric type which can accommodate any value of any input. If any input is a floating @@ -482,14 +482,14 @@ These functions trim off characters on both sides (trim), or the left (ltrim) or +--------------------------+------------+-------------------------+---------------------+----------------------------------------+---------+ * \(1) Only characters specified in :member:`TrimOptions::characters` will be -trimmed off. Both the input string as the `characters` argument are interepreted -as ASCII characters. + trimmed off. Both the input string and the `characters` argument are + interpreted as ASCII characters. * \(2) Only trim off ASCII whitespace characters (``'\t'``, ``'\n'``, ``'\v'``, -``'\f'``, ``'\r'`` and ``' '``). + ``'\f'``, ``'\r'`` and ``' '``). * \(3) Only characters specified in :member:`TrimOptions::characters` will be -trimmed off. + trimmed off. * \(4) Only trim off Unicode whitespace characters. diff --git a/docs/source/status.rst b/docs/source/status.rst index d3bb8216f5d..92c813a8541 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -128,6 +128,8 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Buffer compression | ✓ | | | | | | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ +| Endianness conversion | ✓ (2) | | | | | | | ++-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Custom schema metadata | ✓ | ✓ | | | | ✓ | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -135,6 +137,8 @@ Notes: * \(1) Delta dictionaries not supported on nested dictionaries +* \(2) Data with non-native endianness can be byte-swapped automatically when reading. + .. seealso:: The :ref:`format-ipc` specification.