Skip to content

Commit

Permalink
fix(Stream): Only send done markers if told to do so
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain committed Oct 14, 2020
1 parent 8d26d52 commit 9c48993
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Stream struct {
rangeCh chan keyRange
kvChan chan *pb.KVList
nextStreamId uint32
doneMarkers bool

// Use allocators to generate KVs.
allocatorsMu sync.RWMutex
Expand All @@ -99,6 +100,11 @@ func (st *Stream) Allocator(threadId int) *z.Allocator {
return st.allocators[threadId]
}

// SendDoneMarkers when true would send out done markers on the stream. False by default.
func (st *Stream) SendDoneMarkers(done bool) {
st.doneMarkers = done
}

// ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
// skipping over deleted or expired keys.
func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
Expand Down Expand Up @@ -268,11 +274,16 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
}
}
// Mark the stream as done.
outList.Kv = append(outList.Kv, &pb.KV{
StreamId: streamId,
StreamDone: true,
})
return sendIt()
if st.doneMarkers {
outList.Kv = append(outList.Kv, &pb.KV{
StreamId: streamId,
StreamDone: true,
})
}
if len(outList.Kv) > 0 {
return sendIt()
}
return nil
}

for {
Expand Down

0 comments on commit 9c48993

Please sign in to comment.