Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-5618: [WIP] [C++] [Parquet] Check valid mask before converting Int96 timestamps #4607

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,97 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) {
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
}

TEST(TestArrowReadWrite, UseDeprecatedInt96WithValidMask) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
using ::arrow::schema;

int N = LARGE_SIZE;
std::vector<std::vector<bool>> masks = {{}, {}, {}, {}, {}, {}, {}, {}};
std::vector<int64_t> v_ms, v_ns;

int64_t instant = INT64_C(1262304000000); // 2010-01-01T00:00:00 milliseconds offset
for (int i = 0; i < N; ++i) {
masks[0].push_back(true); // all valid
masks[1].push_back(false); // all invalid
masks[2].push_back(i % 2 != 1); // alternating valid
masks[3].push_back(i % 4 != 1); // majority valid
masks[4].push_back(i % 11 != 1); // majority valid
masks[5].push_back(i % 11 == 1); // majority invalid
masks[6].push_back(i % 131 != 1); // almost all valid
masks[7].push_back(i % 131 == 1); // almost all invalid

v_ms.push_back(instant);
v_ns.push_back(instant * INT64_C(1000000));
instant += 60000;
}

std::vector<std::shared_ptr<WriterProperties>> properties;
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_1_0)
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_1_0)
->encoding(Encoding::PLAIN)
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->encoding(Encoding::PLAIN)
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_1_0)
->encoding(Encoding::RLE)
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->encoding(Encoding::RLE)
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_1_0)
->disable_dictionary()
->build());
properties.push_back(::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->disable_dictionary()
->build());

auto arrow_properties =
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build();

struct Int96TimestampTableArguments {
const std::shared_ptr<WriterProperties>& parquet_properties;
const std::vector<bool>& mask;
};

std::vector<Int96TimestampTableArguments> cases;

for (size_t p = 0; p < properties.size(); ++p) {
for (size_t m = 0; m < masks.size(); ++m) {
cases.push_back({properties[p], masks[m]});
}
}

auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
auto t_ns = ::arrow::timestamp(TimeUnit::NANO);

for (const Int96TimestampTableArguments& c : cases) {
std::shared_ptr<Array> a_ms, a_ns;
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, c.mask, v_ms, &a_ms);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, c.mask, v_ns, &a_ns);
auto input_schema = schema({field("f_ms", t_ms)});
auto input_table =
Table::Make(input_schema, {std::make_shared<Column>("f_ms", a_ms)});
auto expected_schema = schema({field("f_ms", t_ns)});
auto expected_table =
Table::Make(expected_schema, {std::make_shared<Column>("f_ms", a_ns)});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(
input_table, expected_table, c.parquet_properties, arrow_properties));
}
}

TEST(TestArrowReadWrite, CoerceTimestamps) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
Expand Down
20 changes: 17 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1143,15 +1143,29 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));

auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
*data_ptr++ = Int96GetNanoSeconds(values[i]);
}

if (reader->nullable_values()) {
std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
const uint8_t* is_valid_bits = is_valid->data();
::arrow::internal::BitmapReader valid_bits_reader(is_valid_bits, 0,
length); // FIXME: bits offset?
int64_t null_count = 0;
for (int64_t i = 0; i < length; i++) {
if (valid_bits_reader.IsSet()) {
*data_ptr++ = Int96GetNanoSeconds(values[i]);
} else {
*data_ptr++ = 0;
null_count += 1;
}
valid_bits_reader.Next();
}
DCHECK_EQ(reader->null_count(), null_count);
*out = std::make_shared<TimestampArray>(type, length, data, is_valid,
reader->null_count());
} else {
for (int64_t i = 0; i < length; i++) {
*data_ptr++ = Int96GetNanoSeconds(values[i]);
}
*out = std::make_shared<TimestampArray>(type, length, data);
}

Expand Down