From e9e7de0c235fcf14f3cd3abfa2d9865e8c284c8b Mon Sep 17 00:00:00 2001 From: Jan Veltmaat Date: Wed, 13 Mar 2024 13:56:10 -0400 Subject: [PATCH] do not close already closed channel (#11) --- opwindow.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/opwindow.go b/opwindow.go index bb4eabb..6e9ff79 100644 --- a/opwindow.go +++ b/opwindow.go @@ -67,7 +67,10 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { item, ok := q.m[id] if ok { if len(item.OpSet.set) >= q.width { - close(item.IsFull) + if !item.IsFullClosed { + close(item.IsFull) + item.IsFullClosed = true + } q.mu.Unlock() return ErrQueueSaturatedWidth } @@ -163,8 +166,9 @@ func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) { } type queueItem struct { - ID ID - ProcessAt time.Time - OpSet *OpSet - IsFull chan struct{} + ID ID + ProcessAt time.Time + OpSet *OpSet + IsFull chan struct{} + IsFullClosed bool }