Skip to content

Commit 3a1adc8

Browse files
Inphidevopsbo3
authored andcommitted
event: fix Resubscribe deadlock when unsubscribing after inner sub ends (ethereum#28359)
A goroutine is used to manage the lifetime of subscriptions managed by resubscriptions. When the subscription ends with no error, the resub goroutine ends as well. However, the resub goroutine needs to live long enough to read from the unsub channel. Otheriwse, an Unsubscribe call deadlocks when writing to the unsub channel. This is fixed by adding a buffer to the unsub channel.
1 parent 2318fea commit 3a1adc8

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

event/subscription.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio
120120
backoffMax: backoffMax,
121121
fn: fn,
122122
err: make(chan error),
123-
unsub: make(chan struct{}),
123+
unsub: make(chan struct{}, 1),
124124
}
125125
go s.loop()
126126
return s

event/subscription_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,27 @@ func TestResubscribeWithErrorHandler(t *testing.T) {
154154
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
155155
}
156156
}
157+
158+
func TestResubscribeWithCompletedSubscription(t *testing.T) {
159+
t.Parallel()
160+
161+
quitProducerAck := make(chan struct{})
162+
quitProducer := make(chan struct{})
163+
164+
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
165+
return NewSubscription(func(unsubscribed <-chan struct{}) error {
166+
select {
167+
case <-quitProducer:
168+
quitProducerAck <- struct{}{}
169+
return nil
170+
case <-unsubscribed:
171+
return nil
172+
}
173+
}), nil
174+
})
175+
176+
// Ensure producer has started and exited before Unsubscribe
177+
close(quitProducer)
178+
<-quitProducerAck
179+
sub.Unsubscribe()
180+
}

0 commit comments

Comments
 (0)