Skip to content

Commit

Permalink
quic: add backpressure notification to dataqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed May 29, 2023
1 parent 6d2811f commit a907721
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
35 changes: 35 additions & 0 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ class DataQueueImpl final : public DataQueue,
"entries", entries_, "std::vector<std::unique_ptr<Entry>>");
}

void addBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.insert(listener);
}

void removeBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.erase(listener);
}

void NotifyBackpressure(size_t amount) {
if (idempotent_) return;
for (auto listener : backpressure_listeners_)
listener->EntryRead(amount);
}

bool has_backpressure_listeners() const {
return !backpressure_listeners_.empty();
}

std::shared_ptr<Reader> get_reader() override;
SET_MEMORY_INFO_NAME(DataQueue)
SET_SELF_SIZE(DataQueueImpl)
Expand All @@ -173,6 +195,8 @@ class DataQueueImpl final : public DataQueue,
std::optional<uint64_t> capped_size_ = std::nullopt;
bool locked_to_reader_ = false;

std::set<BackpressureListener*> backpressure_listeners_;

friend class DataQueue;
friend class IdempotentDataQueueReader;
friend class NonIdempotentDataQueueReader;
Expand Down Expand Up @@ -433,6 +457,17 @@ class NonIdempotentDataQueueReader final
return;
}

// If there is a backpressure listener, lets report on how much data was
// actually read.
if (data_queue_->has_backpressure_listeners()) {
// How much did we actually read?
size_t read = 0;
for (uint64_t n = 0; n < count; n++) {
read += vecs[n].len;
}
data_queue_->NotifyBackpressure(read);
}

// Now that we have updated this readers state, we can forward
// everything on to the outer next.
std::move(next)(status, vecs, count, std::move(done));
Expand Down
12 changes: 12 additions & 0 deletions src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ class DataQueue : public MemoryRetainer {
using Done = bob::Done;
};

// A BackpressureListener can be used to receive notifications
// when a non-idempotent DataQueue releases entries as they
// are consumed.
class BackpressureListener {
public:
virtual void EntryRead(size_t amount) = 0;
};

// A DataQueue::Entry represents a logical chunk of data in the queue.
// The entry may or may not represent memory-resident data. It may
// or may not be consumable more than once.
Expand Down Expand Up @@ -285,6 +293,10 @@ class DataQueue : public MemoryRetainer {
// been set, maybeCapRemaining() will return std::nullopt.
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;

// BackpressureListeners only work on non-idempotent DataQueues.
virtual void addBackpressureListener(BackpressureListener* listener) = 0;
virtual void removeBackpressureListener(BackpressureListener* listener) = 0;

static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
};
Expand Down

0 comments on commit a907721

Please sign in to comment.