Skip to content

Conversation

@nevi-me
Copy link
Collaborator

@nevi-me nevi-me commented Oct 3, 2020

This is on top of @carols10cents' PR (apache#8330). I've fixed some of the failing tests that were ignored.

nevi-me and others added 5 commits September 25, 2020 17:59
**Note**: I started making changes to apache#6785, and ended up deviating a lot, so I opted for making a new draft PR in case my approach is not suitable.
___

This is a draft to implement an arrow writer for parquet. It supports the following (no complete test coverage yet):

* writing primitives except for booleans and binary
* nested structs
* null values (via definition levels)

It does not yet support:

- Boolean arrays (have to be handled differently from numeric values)
- Binary arrays
- Dictionary arrays
- Union arrays (are they even possible?)

I have only added a test by creating a nested schema, which I tested on pyarrow.

```jupyter
# schema of test_complex.parquet

a: int32 not null
b: int32
c: struct<d: double, e: struct<f: float>> not null
  child 0, d: double
  child 1, e: struct<f: float>
      child 0, f: float
```

This PR potentially addresses:

* https://issues.apache.org/jira/browse/ARROW-8289
* https://issues.apache.org/jira/browse/ARROW-8423
* https://issues.apache.org/jira/browse/ARROW-8424
* https://issues.apache.org/jira/browse/ARROW-8425

And I would like to propose either opening new JIRAs for the above incomplete items, or renaming the last 3 above.

___

**Help Needed**

I'm implementing the definition and repetition levels on first principle from an old Parquet blog post from the Twitter engineering blog. It's likely that I'm not getting some concepts correct, so I would appreciate help with:

* Checking if my logic is correct
* Guidance or suggestions on how to more efficiently extract levels from arrays
* Adding tests - I suspect we might need a lot of tests, so far we only test writing 1 batch, so I don't know how paging would work when writing a large enough file

I also don't know if the various encoding levels (dictionary, RLE, etc.) and compression levels are applied automagically, or if that'd be something we need to explicitly enable.

CC @sunchao @sadikovi @andygrove @paddyhoran

Might be of interest to @mcassels @maxburke

Closes apache#7319 from nevi-me/arrow-parquet-writer

Lead-authored-by: Neville Dipale <[email protected]>
Co-authored-by: Max Burke <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
Co-authored-by: Max Burke <[email protected]>
Signed-off-by: Neville Dipale <[email protected]>
This will allow preserving Arrow-specific metadata when writing or reading Parquet files created from C++ or Rust.
If the schema can't be deserialised, the normal Parquet > Arrow schema conversion is performed.

Closes apache#7917 from nevi-me/ARROW-8243

Authored-by: Neville Dipale <[email protected]>
Signed-off-by: Neville Dipale <[email protected]>
…arrow_schema with ipc changes

Note that this PR is deliberately filed against the rust-parquet-arrow-writer branch, not master!!

Hi! 👋 I'm looking to help out with the rust-parquet-arrow-writer branch, and I just pulled it down and it wasn't compiling because in 75f804e, `schema_to_bytes` was changed to take `IpcWriteOptions` and to return `EncodedData`. This updates `encode_arrow_schema` to use those changes, which should get this branch compiling and passing tests again.

I'm kind of guessing which JIRA ticket this should be associated with; honestly I think this commit can just be squashed with apache@8f0ed91 next time this branch gets rebased.

Please let me know if I should change anything, I'm happy to!

Closes apache#8274 from carols10cents/update-with-ipc-changes

Authored-by: Carol (Nichols || Goulding) <[email protected]>
Signed-off-by: Neville Dipale <[email protected]>
Inspired by tests in cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Tests that currently fail are either marked with `#[should_panic]` (if the
reason they fail is because of a panic) or `#[ignore]` (if the reason
they fail is because the values don't match).
@carols10cents
Copy link
Member

I merged this in to integer32llc/roundtrip-tests.

@nevi-me nevi-me deleted the roundtrip-tests branch October 6, 2020 00:42
carols10cents pushed a commit that referenced this pull request Apr 15, 2021
From a deadlocked run...

```
#0  0x00007f8a5d48dccd in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f8a5d486f05 in pthread_mutex_lock () from /lib64/libpthread.so.0
#2  0x00007f8a566e7e89 in arrow::internal::FnOnce<void ()>::FnImpl<arrow::Future<Aws::Utils::Outcome<Aws::S3::Model::ListObjectsV2Result, Aws::S3::S3Error> >::Callback<arrow::fs::(anonymous namespace)::TreeWalker::ListObjectsV2Handler> >::invoke() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
#3  0x00007f8a5650efa0 in arrow::FutureImpl::AddCallback(arrow::internal::FnOnce<void ()>) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
#4  0x00007f8a566e67a9 in arrow::fs::(anonymous namespace)::TreeWalker::ListObjectsV2Handler::SpawnListObjectsV2() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
#5  0x00007f8a566e723f in arrow::fs::(anonymous namespace)::TreeWalker::WalkChild(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
apache#6  0x00007f8a566e827d in arrow::internal::FnOnce<void ()>::FnImpl<arrow::Future<Aws::Utils::Outcome<Aws::S3::Model::ListObjectsV2Result, Aws::S3::S3Error> >::Callback<arrow::fs::(anonymous namespace)::TreeWalker::ListObjectsV2Handler> >::invoke() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
apache#7  0x00007f8a5650efa0 in arrow::FutureImpl::AddCallback(arrow::internal::FnOnce<void ()>) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
apache#8  0x00007f8a566e67a9 in arrow::fs::(anonymous namespace)::TreeWalker::ListObjectsV2Handler::SpawnListObjectsV2() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
apache#9  0x00007f8a566e723f in arrow::fs::(anonymous namespace)::TreeWalker::WalkChild(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
apache#10 0x00007f8a566e74b1 in arrow::fs::(anonymous namespace)::TreeWalker::DoWalk() () from /arrow/r/check/arrow.Rcheck/arrow/libs/arrow.so
```

The callback `ListObjectsV2Handler` is being called recursively and the mutex is non-reentrant thus deadlock.

To fix it I got rid of the mutex on `TreeWalker` by using `arrow::util::internal::TaskGroup` instead of manually tracking the #/status of in-flight requests.

Closes apache#9842 from westonpace/bugfix/arrow-12040

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
shepmaster pushed a commit that referenced this pull request Mar 3, 2022
Error log of Valgrind failure:
```
[----------] 3 tests from TestArrowReadDeltaEncoding
[ RUN      ] TestArrowReadDeltaEncoding.DeltaBinaryPacked
[       OK ] TestArrowReadDeltaEncoding.DeltaBinaryPacked (812 ms)
[ RUN      ] TestArrowReadDeltaEncoding.DeltaByteArray
==12587== Conditional jump or move depends on uninitialised value(s)
==12587==    at 0x4F12C57: Advance (bit_stream_utils.h:426)
==12587==    by 0x4F12C57: parquet::(anonymous namespace)::DeltaBitPackDecoder<parquet::PhysicalType<(parquet::Type::type)1> >::GetInternal(int*, int) (encoding.cc:2216)
==12587==    by 0x4F13823: Decode (encoding.cc:2091)
==12587==    by 0x4F13823: parquet::(anonymous namespace)::DeltaByteArrayDecoder::SetData(int, unsigned char const*, int) (encoding.cc:2360)
==12587==    by 0x4E89EF5: parquet::(anonymous namespace)::ColumnReaderImplBase<parquet::PhysicalType<(parquet::Type::type)6> >::InitializeDataDecoder(parquet::DataPage const&, long) (column_reader.cc:797)
==12587==    by 0x4E9AE63: ReadNewPage (column_reader.cc:614)
==12587==    by 0x4E9AE63: HasNextInternal (column_reader.cc:576)
==12587==    by 0x4E9AE63: parquet::internal::(anonymous namespace)::TypedRecordReader<parquet::PhysicalType<(parquet::Type::type)6> >::ReadRecords(long) (column_reader.cc:1228)
==12587==    by 0x4DFB19F: parquet::arrow::(anonymous namespace)::LeafReader::LoadBatch(long) (reader.cc:467)
==12587==    by 0x4DF513C: parquet::arrow::ColumnReaderImpl::NextBatch(long, std::shared_ptr<arrow::ChunkedArray>*) (reader.cc:108)
==12587==    by 0x4DFB74D: parquet::arrow::(anonymous namespace)::FileReaderImpl::ReadColumn(int, std::vector<int, std::allocator<int> > const&, parquet::arrow::ColumnReader*, std::shared_ptr<arrow::ChunkedArray>*) (reader.cc:273)
==12587==    by 0x4E11FDA: operator() (reader.cc:1180)
==12587==    by 0x4E11FDA: arrow::Future<std::vector<std::shared_ptr<arrow::ChunkedArray>, std::allocator<arrow::Future> > > arrow::internal::OptionalParallelForAsync<parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::vector<int, std::allocator<int> > const&, std::vector<int, std::allocator<int> > const&, arrow::internal::Executor*)::{lambda(unsigned long, std::shared_ptr<parquet::arrow::ColumnReaderImpl>)#1}&, std::shared_ptr<parquet::arrow::ColumnReaderImpl>, std::shared_ptr<arrow::ChunkedArray> >(bool, std::vector<std::shared_ptr<parquet::arrow::ColumnReaderImpl>, std::allocator<arrow::Future<std::vector<std::shared_ptr<arrow::ChunkedArray>, std::allocator<arrow::Future> > > > >, parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::vector<int, std::allocator<int> > const&, std::vector<int, std::allocator<int> > const&, arrow::internal::Executor*)::{lambda(unsigned long, std::shared_ptr<parquet::arrow::ColumnReaderImpl>)#1}&, arrow::internal::Executor*) (parallel.h:95)
==12587==    by 0x4E126A9: parquet::arrow::(anonymous namespace)::FileReaderImpl::DecodeRowGroups(std::shared_ptr<parquet::arrow::(anonymous namespace)::FileReaderImpl>, std::vector<int, std::allocator<int> > const&, std::vector<int, std::allocator<int> > const&, arrow::internal::Executor*) (reader.cc:1198)
==12587==    by 0x4E12F50: parquet::arrow::(anonymous namespace)::FileReaderImpl::ReadRowGroups(std::vector<int, std::allocator<int> > const&, std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*) (reader.cc:1160)
==12587==    by 0x4DFA2BC: parquet::arrow::(anonymous namespace)::FileReaderImpl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*) (reader.cc:198)
==12587==    by 0x4DFA392: parquet::arrow::(anonymous namespace)::FileReaderImpl::ReadTable(std::shared_ptr<arrow::Table>*) (reader.cc:289)
==12587==    by 0x1DCE62: parquet::arrow::TestArrowReadDeltaEncoding::ReadTableFromParquetFile(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::shared_ptr<arrow::Table>*) (arrow_reader_writer_test.cc:4174)
==12587==    by 0x2266D2: parquet::arrow::TestArrowReadDeltaEncoding_DeltaByteArray_Test::TestBody() (arrow_reader_writer_test.cc:4209)
==12587==    by 0x4AD2C9B: void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (gtest.cc:2607)
==12587==    by 0x4AC9DD1: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (gtest.cc:2643)
==12587==    by 0x4AA4C02: testing::Test::Run() (gtest.cc:2682)
==12587==    by 0x4AA563A: testing::TestInfo::Run() (gtest.cc:2861)
==12587==    by 0x4AA600F: testing::TestSuite::Run() (gtest.cc:3015)
==12587==    by 0x4AB631B: testing::internal::UnitTestImpl::RunAllTests() (gtest.cc:5855)
==12587==    by 0x4AD3CE7: bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) (gtest.cc:2607)
==12587==    by 0x4ACB063: bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) (gtest.cc:2643)
==12587==    by 0x4AB47B6: testing::UnitTest::Run() (gtest.cc:5438)
==12587==    by 0x4218918: RUN_ALL_TESTS() (gtest.h:2490)
==12587==    by 0x421895B: main (gtest_main.cc:52)
```

Closes apache#11725 from pitrou/ARROW-14704-parquet-valgrind

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
shepmaster pushed a commit that referenced this pull request Mar 3, 2022
TODOs:
Convert cheat sheet to PDF and hide slide #1.

Closes apache#12445 from pachadotdev/patch-4

Lead-authored-by: Stephanie Hazlitt <[email protected]>
Co-authored-by: Pachá <[email protected]>
Co-authored-by: Mauricio Vargas <[email protected]>
Co-authored-by: Pachá <[email protected]>
Signed-off-by: Jonathan Keane <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants