Skip to content

Commit

Permalink
[#28142][Go SDK] Evict closed readers from the cache. (#30119)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck committed Jan 26, 2024
1 parent 1d4413d commit e0e20a1
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,20 @@ func (c *DataChannel) read(ctx context.Context) {
continue // we've already closed this cached reader, skip
}
r.PTransformDone()
if r.Closed() {
// Clean up local bookkeeping. We'll never see another message
// for it again. We have to be careful not to remove the real
// one, because readers may be initialized after we've seen
// the full stream.
delete(cache, id.instID)
}
}
seenLast = seenLast[:0] // reset for re-use
c.mu.Unlock()
// Scan through the cache and check for any closed readers, and evict them from the cache.
// Readers might be closed out of band from the data messages because we received all data
// for all transforms in an instruction before the instruction even begun. However, we can't
// know this until we received the Control instruction which knows how many transforms for which
// we need to receive data. So we check the cache directly every so often and evict closed
// readers. We will never recieve data for these instructions again.
for instID, r := range cache {
if r.Closed() {
delete(cache, instID)
}
}
}
}
}
Expand Down

0 comments on commit e0e20a1

Please sign in to comment.