Skip to content

Commit

Permalink
[apache#28142] Evict closed readers from the cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Jan 25, 2024
1 parent 22f847d commit 5233c87
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,20 @@ func (c *DataChannel) read(ctx context.Context) {
continue // we've already closed this cached reader, skip
}
r.PTransformDone()
if r.Closed() {
// Clean up local bookkeeping. We'll never see another message
// for it again. We have to be careful not to remove the real
// one, because readers may be initialized after we've seen
// the full stream.
delete(cache, id.instID)
}
}
seenLast = seenLast[:0] // reset for re-use
c.mu.Unlock()
// Scan through the cache and check for any closed readers, and evict them from the cache.
// Readers might be closed out of band from the data messages because we received all data
// for all transforms in an instruction before the instruction even begun. However, we can't
// know this until we received the Control instruction which knows how many transforms for which
// we need to receive data. So we check the cache directly every so often and evict closed
// readers. We will never recieve data for these instructions again.
for instID, r := range cache {
if r.Closed() {
delete(cache, instID)
}
}
}
}
}
Expand Down

0 comments on commit 5233c87

Please sign in to comment.