Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded CSV Parser is not thread-safe #505

Closed
hcho3 opened this issue Feb 21, 2019 · 3 comments
Closed

Threaded CSV Parser is not thread-safe #505

hcho3 opened this issue Feb 21, 2019 · 3 comments

Comments

@hcho3
Copy link
Contributor

hcho3 commented Feb 21, 2019

Minimal example:

// example.cc
#include <dmlc/registry.h>
#include <dmlc/data.h>
#include <cstdint>

int main(void) {
  std::unique_ptr<dmlc::Parser<uint32_t, float>> parser(
    dmlc::Parser<uint32_t, float>::Create("data.txt", 0, 1, "csv"));
  parser->BeforeFirst();
  while (parser->Next()) {
    const auto& value = parser->Value();
    LOG(INFO) << "value.size = " << value.size;
  }

  return 0;
}

Content of data.txt: data.txt

Compile the example with g++ -g -O0 -fsanitize=thread -o example example.cc ./dmlc-core/lib/libdmlc.a -I./dmlc-core/include -lpthread -lgomp. The Thread Sanitizer detects data races when OMP_NUM_THREADS is set to 2 or higher:

ubuntu@ip-172-31-40-100:~$ OMP_NUM_THREADS=2 ./example
[20:12:14] example.cc:11: value.size = 500
[20:12:14] example.cc:11: value.size = 500
==================
WARNING: ThreadSanitizer: data race (pid=72795)
  Write of size 8 at 0x7bd000020000 by main thread:
    #0 operator delete(void*) <null> (libtsan.so.0+0x741fd)
    #1 dmlc::data::RowBlockContainer<unsigned int, float>::~RowBlockContainer() <null> (example+0x18d14)
    #2 std::unique_ptr<dmlc::Parser<unsigned int, float>, std::default_delete<dmlc::Parser<unsigned int, float> > >::~unique_ptr() /usr/include/c++/7/bits/unique_ptr.h:268 (example+0xcaff)
    #3 main /home/ubuntu/example.cc:7 (example+0xc5f6)

  Previous write of size 8 at 0x7bd000020000 by thread T2:
    #0 operator new(unsigned long) <null> (libtsan.so.0+0x73f0a)
    #1 void std::vector<float, std::allocator<float> >::_M_realloc_insert<float const&>(__gnu_cxx::__normal_iterator<float*, std::vector<float, std::allocator<float> > >, float const&) <null> (example+0x3dc73)

  Thread T2 (tid=72798, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x2bcfe)
    #1 <null> <null> (libgomp.so.1+0x16f51)
    #2 __libc_start_main <null> (libc.so.6+0x21b96)

SUMMARY: ThreadSanitizer: data race (/usr/lib/x86_64-linux-gnu/libtsan.so.0+0x741fd) in operator delete(void*)
==================
==================
WARNING: ThreadSanitizer: data race (pid=72795)
  Write of size 8 at 0x7bd000060000 by main thread:
    #0 operator delete(void*) <null> (libtsan.so.0+0x741fd)
    #1 dmlc::data::RowBlockContainer<unsigned int, float>::~RowBlockContainer() <null> (example+0x18d22)
    #2 std::unique_ptr<dmlc::Parser<unsigned int, float>, std::default_delete<dmlc::Parser<unsigned int, float> > >::~unique_ptr() /usr/include/c++/7/bits/unique_ptr.h:268 (example+0xcaff)
    #3 main /home/ubuntu/example.cc:7 (example+0xc5f6)

  Previous write of size 8 at 0x7bd000060000 by thread T2:
    #0 operator new(unsigned long) <null> (libtsan.so.0+0x73f0a)
    #1 void std::vector<unsigned int, std::allocator<unsigned int> >::emplace_back<unsigned int>(unsigned int&&) <null> (example+0x3e927)

  Thread T2 (tid=72798, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x2bcfe)
    #1 <null> <null> (libgomp.so.1+0x16f51)
    #2 __libc_start_main <null> (libc.so.6+0x21b96)

SUMMARY: ThreadSanitizer: data race (/usr/lib/x86_64-linux-gnu/libtsan.so.0+0x741fd) in operator delete(void*)
==================
==================
WARNING: ThreadSanitizer: data race (pid=72795)
  Write of size 8 at 0x7b7000010800 by main thread:
    #0 operator delete(void*) <null> (libtsan.so.0+0x741fd)
    #1 dmlc::data::RowBlockContainer<unsigned int, float>::~RowBlockContainer() <null> (example+0x18d5a)
    #2 std::unique_ptr<dmlc::Parser<unsigned int, float>, std::default_delete<dmlc::Parser<unsigned int, float> > >::~unique_ptr() /usr/include/c++/7/bits/unique_ptr.h:268 (example+0xcaff)
    #3 main /home/ubuntu/example.cc:7 (example+0xc5f6)

  Previous write of size 8 at 0x7b7000010800 by thread T2:
    #0 operator new(unsigned long) <null> (libtsan.so.0+0x73f0a)
    #1 void std::vector<float, std::allocator<float> >::_M_realloc_insert<float const&>(__gnu_cxx::__normal_iterator<float*, std::vector<float, std::allocator<float> > >, float const&) <null> (example+0x3dc73)

  Thread T2 (tid=72798, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x2bcfe)
    #1 <null> <null> (libgomp.so.1+0x16f51)
    #2 __libc_start_main <null> (libc.so.6+0x21b96)

SUMMARY: ThreadSanitizer: data race (/usr/lib/x86_64-linux-gnu/libtsan.so.0+0x741fd) in operator delete(void*)
==================
==================
WARNING: ThreadSanitizer: data race (pid=72795)
  Write of size 8 at 0x7b8000011000 by main thread:
    #0 operator delete(void*) <null> (libtsan.so.0+0x741fd)
    #1 dmlc::data::CSVParser<unsigned int, float>::~CSVParser() <null> (example+0x18eab)
    #2 std::unique_ptr<dmlc::Parser<unsigned int, float>, std::default_delete<dmlc::Parser<unsigned int, float> > >::~unique_ptr() /usr/include/c++/7/bits/unique_ptr.h:268 (example+0xcaff)
    #3 main /home/ubuntu/example.cc:7 (example+0xc5f6)

  Previous write of size 8 at 0x7b8000011000 by thread T2:
    #0 operator new(unsigned long) <null> (libtsan.so.0+0x73f0a)
    #1 void std::vector<unsigned long, std::allocator<unsigned long> >::emplace_back<unsigned long>(unsigned long&&) <null> (example+0x166b7)

  Thread T2 (tid=72798, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x2bcfe)
    #1 <null> <null> (libgomp.so.1+0x16f51)
    #2 __libc_start_main <null> (libc.so.6+0x21b96)

SUMMARY: ThreadSanitizer: data race (/usr/lib/x86_64-linux-gnu/libtsan.so.0+0x741fd) in operator delete(void*)
==================
ThreadSanitizer: reported 4 warnings
@hcho3
Copy link
Contributor Author

hcho3 commented Feb 21, 2019

TODO: add a sanitizer test to the continuous integration pipeline.

@hcho3
Copy link
Contributor Author

hcho3 commented Feb 21, 2019

Update: replacing

 std::unique_ptr<dmlc::Parser<uint32_t, float>> parser(
    dmlc::Parser<uint32_t, float>::Create("data.txt", 0, 1, "csv"));

with

 dmlc::Parser<uint32_t, float>* parser =
   dmlc::Parser<uint32_t, float>::Create("data.txt", 0, 1, "csv");

eliminates the data race (i.e. the example no longer crashes with Thread Sanitizer). Note that adding delete parser at the end will bring back the same error.

Two possibilities:

  1. Multiple threads are trying to free the same object at the same time (double free)
  2. One thread frees an object and another thread attempts to read it (use-after-free)

@hcho3
Copy link
Contributor Author

hcho3 commented Feb 22, 2019

I got much better diagnostics from Thread Sanitizer, after compiling libdmlc.a with -fsanitize=thread:

ubuntu@ip-172-31-40-100:~$ ./example

==================
WARNING: ThreadSanitizer: data race (pid=29384)
  Write of size 1 at 0x7b4c00000040 by thread T1:
    #0 dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}::operator()() const include/dmlc/threadediter.h:357 (example+0xb4c22)
    #1 void std::__invoke_impl<void, dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}>(std::__invoke_other, dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}&&) /usr/include/c++/7/bits/invoke.h:60 (example+0xb8425)
    #2 std::__invoke_result<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}>::type std::__invoke<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}>(std::__invoke_result&&, (dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}&&)...) /usr/include/c++/7/bits/invoke.h:95 (example+0xb69f2)
    #3 decltype (__invoke((_S_declval<0ul>)())) std::thread::_Invoker<std::tuple<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /usr/include/c++/7/thread:234 (example+0xbad3e)
    #4 std::thread::_Invoker<std::tuple<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}> >::operator()() /usr/include/c++/7/thread:243 (example+0xbacd0)
    #5 std::thread::_State_impl<std::thread::_Invoker<std::tuple<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}> > >::_M_run() /usr/include/c++/7/thread:186 (example+0xbaaa8)
    #6 <null> <null> (libstdc++.so.6+0xbd57e)

  Previous read of size 1 at 0x7b4c00000040 by main thread (mutexes: write M38):
    #0 dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Next(dmlc::io::InputSplitBase::Chunk**)::{lambda()#1}::operator()() const include/dmlc/threadediter.h:420 (example+0xb3bbd)
    #1 void std::condition_variable::wait<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Next(dmlc::io::InputSplitBase::Chunk**)::{lambda()#1}>(std::unique_lock<std::mutex>&, dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Next(dmlc::io::InputSplitBase::Chunk**)::{lambda()#1}) /usr/include/c++/7/condition_variable:98 (example+0xb6219)
    #2 dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Next(dmlc::io::InputSplitBase::Chunk**) include/dmlc/threadediter.h:419 (example+0xb3d5b)
    #3 dmlc::io::ThreadedInputSplit::NextChunk(dmlc::InputSplit::Blob*) src/io/threaded_input_split.h:72 (example+0xb2df2)
    #4 dmlc::data::TextParserBase<unsigned int, float>::FillData(std::vector<dmlc::data::RowBlockContainer<unsigned int, float>, std::allocator<dmlc::data::RowBlockContainer<unsigned int, float> > >*) <null> (example+0x9ea05)
    #5 dmlc::data::TextParserBase<unsigned int, float>::ParseNext(std::vector<dmlc::data::RowBlockContainer<unsigned int, float>, std::allocator<dmlc::data::RowBlockContainer<unsigned int, float> > >*) <null> (example+0x9221f)
    #6 dmlc::data::ParserImpl<unsigned int, float>::Next() <null> (example+0x92110)
    #7 main /home/ubuntu/example.cc:10 (example+0xe342)

  Location is heap block of size 448 at 0x7b4c00000000 allocated by main thread:
    #0 operator new(unsigned long) <null> (libtsan.so.0+0x73f0a)
    #1 dmlc::InputSplit::Create(char const*, char const*, unsigned int, unsigned int, char const*, bool, int, unsigned long, bool) src/io.cc:121 (example+0xae976)
    #2 dmlc::InputSplit::Create(char const*, unsigned int, unsigned int, char const*) src/io.cc:78 (example+0xae48f)
    #3 dmlc::Parser<unsigned int, float>* dmlc::data::CreateCSVParser<unsigned int, float>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&, unsigned int, unsigned int) <null> (example+0x1c5ca)
    #4 dmlc::Parser<unsigned int, float>* dmlc::data::CreateParser_<unsigned int, float>(char const*, unsigned int, unsigned int, char const*) <null> (example+0x19670)
    #5 dmlc::Parser<unsigned int, float>::Create(char const*, unsigned int, unsigned int, char const*) src/data.cc:175 (example+0xf562)
    #6 main /home/ubuntu/example.cc:8 (example+0xe2cc)

  Mutex M38 (0x7b4c00000050) created at:
    #0 pthread_mutex_lock <null> (libtsan.so.0+0x3faeb)
    #1 __gthread_mutex_lock /usr/include/x86_64-linux-gnu/c++/7/bits/gthr-default.h:748 (example+0xeeb5)
    #2 std::mutex::lock() /usr/include/c++/7/bits/std_mutex.h:103 (example+0x114de)
    #3 std::unique_lock<std::mutex>::lock() /usr/include/c++/7/bits/std_mutex.h:267 (example+0x644f7)
    #4 std::unique_lock<std::mutex>::unique_lock(std::mutex&) /usr/include/c++/7/bits/std_mutex.h:197 (example+0x5267e)
    #5 dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}::operator()() const include/dmlc/threadediter.h:314 (example+0xb4746)
    #6 void std::__invoke_impl<void, dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}>(std::__invoke_other, dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}&&) /usr/include/c++/7/bits/invoke.h:60 (example+0xb8425)
    #7 std::__invoke_result<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}>::type std::__invoke<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}>(std::__invoke_result&&, (dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}&&)...) /usr/include/c++/7/bits/invoke.h:95 (example+0xb69f2)
    #8 decltype (__invoke((_S_declval<0ul>)())) std::thread::_Invoker<std::tuple<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /usr/include/c++/7/thread:234 (example+0xbad3e)
    #9 std::thread::_Invoker<std::tuple<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}> >::operator()() /usr/include/c++/7/thread:243 (example+0xbacd0)
    #10 std::thread::_State_impl<std::thread::_Invoker<std::tuple<dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}> > >::_M_run() /usr/include/c++/7/thread:186 (example+0xbaaa8)
    #11 <null> <null> (libstdc++.so.6+0xbd57e)

  Thread T1 (tid=29386, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x2bcfe)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xbd834)
    #2 dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>) include/dmlc/threadediter.h:407 (example+0xb55aa)
    #3 dmlc::io::ThreadedInputSplit::ThreadedInputSplit(dmlc::io::InputSplitBase*, unsigned long) src/io/threaded_input_split.h:35 (example+0xb28fa)
    #4 dmlc::InputSplit::Create(char const*, char const*, unsigned int, unsigned int, char const*, bool, int, unsigned long, bool) src/io.cc:121 (example+0xae98f)
    #5 dmlc::InputSplit::Create(char const*, unsigned int, unsigned int, char const*) src/io.cc:78 (example+0xae48f)
    #6 dmlc::Parser<unsigned int, float>* dmlc::data::CreateCSVParser<unsigned int, float>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&, unsigned int, unsigned int) <null> (example+0x1c5ca)
    #7 dmlc::Parser<unsigned int, float>* dmlc::data::CreateParser_<unsigned int, float>(char const*, unsigned int, unsigned int, char const*) <null> (example+0x19670)
    #8 dmlc::Parser<unsigned int, float>::Create(char const*, unsigned int, unsigned int, char const*) src/data.cc:175 (example+0xf562)
    #9 main /home/ubuntu/example.cc:8 (example+0xe2cc)

SUMMARY: ThreadSanitizer: data race include/dmlc/threadediter.h:357 in dmlc::ThreadedIter<dmlc::io::InputSplitBase::Chunk>::Init(std::function<bool (dmlc::io::InputSplitBase::Chunk**)>, std::function<void ()>)::{lambda()#1}::operator()() const
==================
[03:33:24] example.cc:12: value.size = 1000
ThreadSanitizer: reported 1 warnings

hcho3 added a commit to hcho3/dmlc-core that referenced this issue Mar 7, 2019
@hcho3 hcho3 closed this as completed in 15362f8 Mar 19, 2019
eric-haibin-lin pushed a commit to eric-haibin-lin/dmlc-core that referenced this issue Mar 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant