diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 2cc8b0d05f4fd..ad4d323c01d04 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1417,6 +1417,97 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } +TEST(TestArrowReadWrite, UseDeprecatedInt96WithValidMask) { + using ::arrow::ArrayFromVector; + using ::arrow::field; + using ::arrow::schema; + + int N = LARGE_SIZE; + std::vector> masks = {{}, {}, {}, {}, {}, {}, {}, {}}; + std::vector v_ms, v_ns; + + int64_t instant = INT64_C(1262304000000); // 2010-01-01T00:00:00 milliseconds offset + for (int i = 0; i < N; ++i) { + masks[0].push_back(true); // all valid + masks[1].push_back(false); // all invalid + masks[2].push_back(i % 2 != 1); // alternating valid + masks[3].push_back(i % 4 != 1); // majority valid + masks[4].push_back(i % 11 != 1); // majority valid + masks[5].push_back(i % 11 == 1); // majority invalid + masks[6].push_back(i % 131 != 1); // almost all valid + masks[7].push_back(i % 131 == 1); // almost all invalid + + v_ms.push_back(instant); + v_ns.push_back(instant * INT64_C(1000000)); + instant += 60000; + } + + std::vector> properties; + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->encoding(Encoding::PLAIN) + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->encoding(Encoding::PLAIN) + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->encoding(Encoding::RLE) + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->encoding(Encoding::RLE) + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->disable_dictionary() + ->build()); + properties.push_back(::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->disable_dictionary() + ->build()); + + auto arrow_properties = + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(); + + struct Int96TimestampTableArguments { + const std::shared_ptr& parquet_properties; + const std::vector& mask; + }; + + std::vector cases; + + for (size_t p = 0; p < properties.size(); ++p) { + for (size_t m = 0; m < masks.size(); ++m) { + cases.push_back({properties[p], masks[m]}); + } + } + + auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); + auto t_ns = ::arrow::timestamp(TimeUnit::NANO); + + for (const Int96TimestampTableArguments& c : cases) { + std::shared_ptr a_ms, a_ns; + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, c.mask, v_ms, &a_ms); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, c.mask, v_ns, &a_ns); + auto input_schema = schema({field("f_ms", t_ms)}); + auto input_table = + Table::Make(input_schema, {std::make_shared("f_ms", a_ms)}); + auto expected_schema = schema({field("f_ms", t_ns)}); + auto expected_table = + Table::Make(expected_schema, {std::make_shared("f_ms", a_ns)}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip( + input_table, expected_table, c.parquet_properties, arrow_properties)); + } +} + TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index f757b5ff847e8..4dfd17097d84e 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1143,15 +1143,29 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> { RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data)); auto data_ptr = reinterpret_cast(data->mutable_data()); - for (int64_t i = 0; i < length; i++) { - *data_ptr++ = Int96GetNanoSeconds(values[i]); - } if (reader->nullable_values()) { std::shared_ptr is_valid = reader->ReleaseIsValid(); + const uint8_t* is_valid_bits = is_valid->data(); + ::arrow::internal::BitmapReader valid_bits_reader(is_valid_bits, 0, + length); // FIXME: bits offset? + int64_t null_count = 0; + for (int64_t i = 0; i < length; i++) { + if (valid_bits_reader.IsSet()) { + *data_ptr++ = Int96GetNanoSeconds(values[i]); + } else { + *data_ptr++ = 0; + null_count += 1; + } + valid_bits_reader.Next(); + } + DCHECK_EQ(reader->null_count(), null_count); *out = std::make_shared(type, length, data, is_valid, reader->null_count()); } else { + for (int64_t i = 0; i < length; i++) { + *data_ptr++ = Int96GetNanoSeconds(values[i]); + } *out = std::make_shared(type, length, data); }