Skip to content

Commit

Permalink
fix(stream): Stop produceKVs on error (#1604)
Browse files Browse the repository at this point in the history
The orchestrate function would get blocked forever if send function returned an error.
The produceKv go routines would also get blocked since the size of the error chan was 1.
  • Loading branch information
Ibrahim Jarif authored Nov 25, 2020
1 parent 340ccfc commit f36daf5
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ outer:
// are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
// return that error. Orchestrate can be called multiple times, but in serial order.
func (st *Stream) Orchestrate(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
for _, a := range st.allocators {
// Using AllocatorFrom is better because if the allocator is already freed up, it would
Expand All @@ -421,7 +423,7 @@ func (st *Stream) Orchestrate(ctx context.Context) error {
// Picks up ranges from Badger, and sends them to rangeCh.
go st.produceRanges(ctx)

errCh := make(chan error, 1) // Stores error by consumeKeys.
errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
var wg sync.WaitGroup
for i := 0; i < st.NumGo; i++ {
wg.Add(1)
Expand All @@ -442,7 +444,11 @@ func (st *Stream) Orchestrate(ctx context.Context) error {
kvErr := make(chan error, 1)
go func() {
// Picks up KV lists from kvChan, and sends them to Output.
kvErr <- st.streamKVs(ctx)
err := st.streamKVs(ctx)
if err != nil {
cancel() // Stop all the go routines.
}
kvErr <- err
}()
wg.Wait() // Wait for produceKVs to be over.
close(st.kvChan) // Now we can close kvChan.
Expand Down

0 comments on commit f36daf5

Please sign in to comment.