Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wangkuiyi committed Jan 29, 2018
1 parent cf407a5 commit 62fb71a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 5 deletions.
59 changes: 55 additions & 4 deletions paddle/framework/channel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,64 @@ limitations under the License. */

#include "paddle/framework/channel.h"

#include <chrono>
#include <thread>

#include "gtest/gtest.h"


using paddle::framework::Channel;
using paddle::framework::MakeChannel;
using paddle::framework::CloseChannel;

TEST(Channel, MakeAndClose) {
using paddle::framework::Channel;
using paddle::framework::MakeChannel;
using paddle::framework::CloseChannel;
using paddle::framework::details::Buffered;
using paddle::framework::details::UnBuffered;
{
// MakeChannel should return a buffered channel is buffer_size > 0.
auto ch = MakeChannel<int>(10);
EXPECT_NE(dynamic_cast<Buffered<int>*>(ch), nullptr);
EXPECT_EQ(dynamic_cast<UnBuffered<int>*>(ch), nullptr);
CloseChannel(ch);
}
{
// MakeChannel should return an un-buffered channel is buffer_size = 0.
auto ch = MakeChannel<int>(0);
EXPECT_EQ(dynamic_cast<Buffered<int>*>(ch), nullptr);
EXPECT_NE(dynamic_cast<UnBuffered<int>*>(ch), nullptr);
CloseChannel(ch);
}
}

TEST(Channel, SufficientBufferSizeDoesntBlock) {
const size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size);
for (size_t i = 0; i < buffer_size; ++i) {
ch->Send(&i); // should not block
}

size_t out;
for (size_t i = 0; i < buffer_size; ++i) {
ch->Receive(&out); // should not block
EXPECT_EQ(out, i);
}
CloseChannel(ch);
}

TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
const size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size);

Channel<int>* ch = MakeChannel<int>(10);
size_t sum = 0;
std::thread t([&](){
// Try to write more than buffer size.
for (size_t i = 0; i < 2*buffer_size; ++i) {
ch->Send(&i); // should not block
sum += i;
}
});

std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.5 sec
EXPECT_EQ(sum, 45U);
CloseChannel(ch);
}
3 changes: 2 additions & 1 deletion paddle/framework/details/buffered_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License. */
#include <mutex>

#include "paddle/framework/channel.h"
#include "paddle/platform/enforce.h"

namespace paddle {
namespace framework {
Expand All @@ -40,7 +41,7 @@ class Buffered : public paddle::framework::Channel<T> {
std::condition_variable full_cond_var_;
std::deque<T> channel_;

Buffered(size_t cap) : cap_(cap) {}
Buffered(size_t cap) : cap_(cap) { PADDLE_ENFORCE_GT(cap, 0); }
virtual ~Buffered();

void NotifyAllSenders(std::unique_lock<std::mutex>*);
Expand Down

0 comments on commit 62fb71a

Please sign in to comment.