Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine channel test #7946

Merged
merged 11 commits into from
Jan 31, 2018
Merged

Conversation

chengduoZH
Copy link
Contributor

@chengduoZH chengduoZH commented Jan 29, 2018

fix #7948

  • The basic function of buffered_channel.
  • The function of buffered_channel under multithreading.

framework::ThreadPool* pool;
pool = framework::ThreadPool::GetInstance();

// Receiver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be "Consumer" and "Producer".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


TEST(Channel, Buffered) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test case for "unbuffered" channel too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implement of unbuffered channel is empty.

template <typename T>
void UnBuffered<T>::Send(T* channel_element) {}
template <typename T>
void UnBuffered<T>::Receive(T*) {}
template <typename T>
UnBuffered<T>::~UnBuffered() {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is about unbuffered channel.

typhoonzero
typhoonzero previously approved these changes Jan 29, 2018
Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. LGTM!

Channel<int>* ch = MakeChannel<int>(capacity);

framework::ThreadPool* pool;
pool = framework::ThreadPool::GetInstance();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For two reasons, let us do not use framework::ThreadPool in this unit test, but use std::thread:

  1. To minimize the dependencies.
  2. framework::ThreadPool sets the number of OS threads automatically, which could be one.

pool = framework::ThreadPool::GetInstance();

// Consumer
for (int i = 0; i < capacity; ++i) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the purpose of this unit test. Do you have a list of edge cases?

Copy link
Collaborator

@wangkuiyi wangkuiyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us remoe the dependency to ThreadPool

wangkuiyi
wangkuiyi previously approved these changes Jan 30, 2018
@chengduoZH chengduoZH force-pushed the feature/Add_channel_test branch 2 times, most recently from ccf829c to be0525f Compare January 30, 2018 03:32
}

template <typename T>
void DeleteChannel(Channel<T>* ch) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we need CloseChannel, which mimics Go's close, and DeleteChannel, which mimics Go's garbage collection.

One step forward, I think we can have only CloseChannel, because DeleteChannel could be simplified as delete ch -- I didn't have to and should not have defined template <typename T> void DeleteChannel.

Copy link
Contributor Author

@chengduoZH chengduoZH Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you.
We should make a further analysis about CloseChannel. When the channel was closed, the channel_ of Channel may also have some data, I think that receive can also work properly, but send cannot send data to channel_.
So, CloseChannel should not delete Buffered or Unbuffered.

If we close the channel by calling the destructor, the objection will be deleted.
But the destructor also calls notify_one, this causes some data also to be placed in the channel, this is obviously an error.

template <typename T>
Buffered<T>::~Buffered() {
std::unique_lock<std::mutex> lock(mu_);
channel_.clear();
NotifyAllSenders(&lock);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to close the channel via the destructor. My idea is the other way. See #7946 (comment)

@@ -72,4 +72,5 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.5 sec
EXPECT_EQ(sum, 45U);
CloseChannel(ch);
t.join();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you closed but didn't delete the channel in this test? In my mind, this test should look like

std::unique_ptr<Channel<int>*> ch(MakeChannel<int>(10)); // `delete ch.get()` will be called
do_something_here_with(ch);
CloseChannel(ch.get());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you closed but didn't delete the channel in this test?

We can add DeleteChannel behind CloseChannel.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maye the use of unique_ptr didn't present my idea clearly. Let us use delete explicitly:

auto ch = MakeChannel<int>(10);
do_something(ch);
CloseChannel(ch);
delete ch;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!
If we use delete to destroy the ch but other threads also hold the ch and these threads send data to the ch, there will cause an error.
I wonder that whether this case is existent.


private:
size_t cap_;
std::mutex mu_;
std::condition_variable empty_cond_var_;
std::condition_variable full_cond_var_;
std::deque<T> channel_;
bool close;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close => closed_ or closing_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@wangkuiyi wangkuiyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Let us merge this PR and go on working on the channels in future PRs.

@wangkuiyi wangkuiyi merged commit adf14b0 into PaddlePaddle:develop Jan 31, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refine channel test
3 participants