Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: refactor finalSize
Browse files Browse the repository at this point in the history
Fixes: #65
  • Loading branch information
jasnell committed Jan 22, 2020
1 parent dd3967f commit 65fe214
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 36 deletions.
13 changes: 11 additions & 2 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1766,8 +1766,7 @@ or writable side has closed naturally.
The callback is invoked with two arguments:

* `code` {number} The QUIC application error code used to terminate the stream.
* `finalSize` {number} The total number of bytes received by the `QuicStream`
as of the moment the stream was closed.
* `family` {number} Identifier of the error code family.

#### Event: `'close'`
<!-- YAML
Expand Down Expand Up @@ -1955,6 +1954,16 @@ added: REPLACEME

A `BigInt` representing the length of time the `QuicStream` has been active.

### quicstream.finalSize
<!-- YAML
added: REPLACEME
-->

* Type: {BigInt}

A `BigInt` specifying the total number of bytes successfully received by the
`QuicStream`.

#### quicstream.id
<!-- YAML
added: REPLACEME
Expand Down
20 changes: 12 additions & 8 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ const {
IDX_QUIC_STREAM_STATS_BYTES_RECEIVED,
IDX_QUIC_STREAM_STATS_BYTES_SENT,
IDX_QUIC_STREAM_STATS_MAX_OFFSET,
IDX_QUIC_STREAM_STATS_FINAL_SIZE,
IDX_QUIC_SOCKET_STATS_CREATED_AT,
IDX_QUIC_SOCKET_STATS_BOUND_AT,
IDX_QUIC_SOCKET_STATS_LISTEN_AT,
Expand Down Expand Up @@ -506,8 +507,8 @@ function onStreamClose(id, appErrorCode) {
}

// Called by the C++ internals when a stream has been reset
function onStreamReset(id, appErrorCode, finalSize) {
this[owner_symbol][kStreamReset](id, appErrorCode, finalSize);
function onStreamReset(id, appErrorCode) {
this[owner_symbol][kStreamReset](id, appErrorCode);
}

// Called when an error occurs in a QuicStream
Expand Down Expand Up @@ -1605,12 +1606,12 @@ class QuicSession extends EventEmitter {
stream[kHeaders](headers, kind, push_id);
}

[kStreamReset](id, code, finalSize) {
[kStreamReset](id, code) {
const stream = this.#streams.get(id);
if (stream === undefined)
return;

stream[kStreamReset](code, finalSize);
stream[kStreamReset](code);
}

[kInspect]() {
Expand Down Expand Up @@ -2318,7 +2319,6 @@ class QuicStream extends Duplex {
#id = undefined;
#push_id = undefined;
#resetCode = undefined;
#resetFinalSize = undefined;
#session = undefined;
#dataRateHistogram = undefined;
#dataSizeHistogram = undefined;
Expand Down Expand Up @@ -2380,9 +2380,8 @@ class QuicStream extends Duplex {
}
}

[kStreamReset](code, finalSize) {
[kStreamReset](code) {
this.#resetCode = code | 0;
this.#resetFinalSize = finalSize | 0;
this.push(null);
this.read();
}
Expand Down Expand Up @@ -2664,7 +2663,7 @@ class QuicStream extends Duplex {

get resetReceived() {
return (this.#resetCode !== undefined) ?
{ code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } :
{ code: this.#resetCode | 0 } :
undefined;
}

Expand Down Expand Up @@ -2853,6 +2852,11 @@ class QuicStream extends Duplex {
const stats = this.#stats || this[kHandle].stats;
return stats[IDX_QUIC_STREAM_STATS_MAX_OFFSET];
}

get finalSize() {
const stats = this.#stats || this[kHandle].stats;
return stats[IDX_QUIC_STREAM_STATS_FINAL_SIZE];
}
}

function createSocket(options) {
Expand Down
3 changes: 1 addition & 2 deletions src/quic/node_quic_http3_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,9 @@ void Http3Application::StreamClose(

void Http3Application::StreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code) {
nghttp3_conn_reset_stream(connection(), stream_id);
QuicApplication::StreamReset(stream_id, final_size, app_error_code);
QuicApplication::StreamReset(stream_id, app_error_code);
}

// When SendPendingData tries to send data for a given stream and there
Expand Down
1 change: 0 additions & 1 deletion src/quic/node_quic_http3_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ class Http3Application final :

void StreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code) override;

void ResumeStream(int64_t stream_id) override;
Expand Down
13 changes: 4 additions & 9 deletions src/quic/node_quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,9 @@ void QuicSessionListener::OnStreamClose(

void QuicSessionListener::OnStreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code) {
if (previous_listener_ != nullptr)
previous_listener_->OnStreamReset(stream_id, final_size, app_error_code);
previous_listener_->OnStreamReset(stream_id, app_error_code);
}

void QuicSessionListener::OnSessionDestroyed() {
Expand Down Expand Up @@ -419,16 +418,14 @@ void JSQuicSessionListener::OnStreamClose(

void JSQuicSessionListener::OnStreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code) {
Environment* env = session()->env();
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());

Local<Value> argv[] = {
Number::New(env->isolate(), static_cast<double>(stream_id)),
Number::New(env->isolate(), static_cast<double>(app_error_code)),
Number::New(env->isolate(), static_cast<double>(final_size))
Number::New(env->isolate(), static_cast<double>(app_error_code))
};
// Grab a shared pointer to this to prevent the QuicSession
// from being freed while the MakeCallback is running.
Expand Down Expand Up @@ -1214,9 +1211,8 @@ void QuicApplication::StreamClose(

void QuicApplication::StreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code) {
session()->listener()->OnStreamReset(stream_id, final_size, app_error_code);
session()->listener()->OnStreamReset(stream_id, app_error_code);
}

// Determines which QuicApplication variant the QuicSession will be using
Expand Down Expand Up @@ -1554,7 +1550,6 @@ BaseObjectPtr<QuicStream> QuicSession::CreateStream(int64_t stream_id) {
void QuicSession::Destroy() {
if (is_flag_set(QUICSESSION_FLAG_DESTROYED))
return;
Debug(this, "Destroying");

// If we're not in the closing or draining periods,
// then we should at least attempt to send a connection
Expand Down Expand Up @@ -2382,7 +2377,7 @@ void QuicSession::StreamReset(

if (stream) {
stream->set_final_size(final_size);
application_->StreamReset(stream_id, final_size, app_error_code);
application_->StreamReset(stream_id, app_error_code);
}
}

Expand Down
3 changes: 0 additions & 3 deletions src/quic/node_quic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ class QuicSessionListener {
uint64_t app_error_code);
virtual void OnStreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code);
virtual void OnSessionDestroyed();
virtual void OnSessionClose(QuicError error);
Expand Down Expand Up @@ -303,7 +302,6 @@ class JSQuicSessionListener : public QuicSessionListener {
uint64_t app_error_code) override;
void OnStreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code) override;
void OnSessionDestroyed() override;
void OnSessionClose(QuicError error) override;
Expand Down Expand Up @@ -530,7 +528,6 @@ class QuicApplication : public MemoryRetainer {
virtual void StreamOpen(int64_t stream_id);
virtual void StreamReset(
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code);
virtual bool SubmitInformation(
int64_t stream_id,
Expand Down
1 change: 1 addition & 0 deletions src/quic/node_quic_stream-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void QuicStream::set_flag(int32_t flag, bool on) {
}

void QuicStream::set_final_size(uint64_t final_size) {
CHECK_EQ(GetStat(&QuicStreamStats::final_size), 0);
SetStat(&QuicStreamStats::final_size, final_size);
}

Expand Down
11 changes: 1 addition & 10 deletions src/quic/node_quic_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ void QuicStream::Destroy() {
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
streambuf_.End();

uint64_t now = uv_hrtime();
Debug(this,
"Destroying.\n"
" Duration: %" PRIu64 "\n"
" Bytes Received: %" PRIu64 "\n"
" Bytes Sent: %" PRIu64,
uv_hrtime() - GetStat(&QuicStreamStats::created_at),
GetStat(&QuicStreamStats::bytes_received),
GetStat(&QuicStreamStats::bytes_sent));

// If there is data currently buffered in the streambuf_,
// then cancel will call out to invoke an arbitrary
// JavaScript callback (the on write callback). Within
Expand Down Expand Up @@ -373,6 +363,7 @@ void QuicStream::ReceiveData(
// stream, indicating that the stream will no longer be readable.
if (fin) {
set_flag(QUICSTREAM_FLAG_FIN);
set_final_size(offset + datalen);
EmitRead(UV_EOF);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/quic/node_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class QuicStream : public AsyncWrap,
// Set the final size for the QuicStream
inline void set_final_size(uint64_t final_size);

// The final size is the maximum amount of data that has been
// acknowleged to have been received for a QuicStream.
uint64_t final_size() const {
return GetStat(&QuicStreamStats::final_size);
}
Expand Down
7 changes: 6 additions & 1 deletion test/parallel/test-quic-client-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ server.on('session', common.mustCall((session) => {
uni.write(unidata[0], common.mustCall());
uni.end(unidata[1], common.mustCall());
uni.on('finish', common.mustCall());
uni.on('close', common.mustCall());
uni.on('end', common.mustCall());
uni.on('data', common.mustNotCall());
uni.on('close', common.mustCall(() => {
assert.strictEqual(uni.finalSize, 0n);
}));
debug('Unidirectional, Server-initiated stream %d opened', uni.id);
}));

Expand Down Expand Up @@ -184,6 +186,7 @@ server.on('session', common.mustCall((session) => {
assert.strictEqual(typeof stream.bytesReceived, 'bigint');
assert.strictEqual(typeof stream.bytesSent, 'bigint');
assert.strictEqual(typeof stream.maxExtendedOffset, 'bigint');
assert.strictEqual(stream.finalSize, BigInt(filedata.length));
}));
}));

Expand Down Expand Up @@ -320,6 +323,7 @@ server.on('ready', common.mustCall(() => {
}));
stream.on('close', common.mustCall(() => {
debug('Bidirectional, Client-initiated stream %d closed', stream.id);
assert.strictEqual(stream.finalSize, BigInt(filedata.length));
countdown.dec();
}));
debug('Bidirectional, Client-initiated stream %d opened', stream.id);
Expand All @@ -336,6 +340,7 @@ server.on('ready', common.mustCall(() => {
}));
stream.on('close', common.mustCall(() => {
debug('Unidirectional, Server-initiated stream %d closed', stream.id);
assert.strictEqual(stream.finalSize, 26n);
countdown.dec();
}));
}));
Expand Down

0 comments on commit 65fe214

Please sign in to comment.