Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
src: enable StreamPipe for generic StreamBases
Browse files Browse the repository at this point in the history
PR-URL: #150
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
addaleax authored and jasnell committed Feb 13, 2020
1 parent 6f5ad2c commit 671fa2c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 16 deletions.
56 changes: 43 additions & 13 deletions src/stream_pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
source->PushStreamListener(&readable_listener_);
sink->PushStreamListener(&writable_listener_);

CHECK(sink->HasWantsWrite());
uses_wants_write_ = sink->HasWantsWrite();

// Set up links between this object and the source/sink objects.
// In particular, this makes sure that they are garbage collected as a group,
Expand Down Expand Up @@ -66,7 +66,8 @@ void StreamPipe::Unpipe(bool is_in_deletion) {
is_closed_ = true;
is_reading_ = false;
source()->RemoveStreamListener(&readable_listener_);
sink()->RemoveStreamListener(&writable_listener_);
if (pending_writes_ == 0)
sink()->RemoveStreamListener(&writable_listener_);

if (is_in_deletion) return;

Expand Down Expand Up @@ -126,13 +127,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
// EOF or error; stop reading and pass the error to the previous listener
// (which might end up in JS).
pipe->is_eof_ = true;
// Cache `sink()` here because the previous listener might do things
// that eventually lead to an `Unpipe()` call.
StreamBase* sink = pipe->sink();
stream()->ReadStop();
CHECK_NOT_NULL(previous_listener_);
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
// If we’re not writing, close now. Otherwise, we’ll do that in
// `OnStreamAfterWrite()`.
if (!pipe->is_writing_) {
pipe->ShutdownWritable();
if (pipe->pending_writes_ == 0) {
sink->Shutdown();
pipe->Unpipe();
}
return;
Expand All @@ -142,32 +146,40 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
}

void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
CHECK(uses_wants_write_ || pending_writes_ == 0);
uv_buf_t buffer = uv_buf_init(buf.data(), nread);
StreamWriteResult res = sink()->Write(&buffer, 1);
pending_writes_++;
if (!res.async) {
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
} else {
is_writing_ = true;
is_reading_ = false;
res.wrap->SetAllocatedStorage(std::move(buf));
if (source() != nullptr)
source()->ReadStop();
}
}

void StreamPipe::ShutdownWritable() {
sink()->Shutdown();
}

void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
int status) {
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
pipe->is_writing_ = false;
pipe->pending_writes_--;
if (pipe->is_closed_) {
if (pipe->pending_writes_ == 0) {
Environment* env = pipe->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
stream()->RemoveStreamListener(this);
}
return;
}

if (pipe->is_eof_) {
HandleScope handle_scope(pipe->env()->isolate());
InternalCallbackScope callback_scope(pipe,
InternalCallbackScope::kSkipTaskQueues);
pipe->ShutdownWritable();
pipe->sink()->Shutdown();
pipe->Unpipe();
return;
}
Expand All @@ -179,6 +191,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
prev->OnStreamAfterWrite(w, status);
return;
}

if (!pipe->uses_wants_write_) {
OnStreamWantsWrite(65536);
}
}

void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
Expand All @@ -202,6 +218,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
pipe->sink_destroyed_ = true;
pipe->is_eof_ = true;
pipe->pending_writes_ = 0;
pipe->Unpipe();
}

Expand Down Expand Up @@ -242,8 +259,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
pipe->is_closed_ = false;
if (pipe->wanted_data_ > 0)
pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
pipe->writable_listener_.OnStreamWantsWrite(65536);
}

void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
Expand All @@ -252,6 +268,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
pipe->Unpipe();
}

void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
args.GetReturnValue().Set(pipe->is_closed_);
}

void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
args.GetReturnValue().Set(pipe->pending_writes_);
}

namespace {

void InitializeStreamPipe(Local<Object> target,
Expand All @@ -266,6 +294,8 @@ void InitializeStreamPipe(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
env->SetProtoMethod(pipe, "start", StreamPipe::Start);
env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
pipe->SetClassName(stream_pipe_string);
pipe->InstanceTemplate()->SetInternalFieldCount(1);
Expand Down
7 changes: 4 additions & 3 deletions src/stream_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class StreamPipe : public AsyncWrap {
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
static void IsClosed(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PendingWrites(const v8::FunctionCallbackInfo<v8::Value>& args);

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(StreamPipe)
Expand All @@ -26,14 +28,13 @@ class StreamPipe : public AsyncWrap {
inline StreamBase* source();
inline StreamBase* sink();

inline void ShutdownWritable();

int pending_writes_ = 0;
bool is_reading_ = false;
bool is_writing_ = false;
bool is_eof_ = false;
bool is_closed_ = true;
bool sink_destroyed_ = false;
bool source_destroyed_ = false;
bool uses_wants_write_ = false;

// Set a default value so that when we’re coming from Start(), we know
// that we don’t want to read just yet.
Expand Down

0 comments on commit 671fa2c

Please sign in to comment.