diff --git a/stream.go b/stream.go index d6c5e4317..52648c781 100644 --- a/stream.go +++ b/stream.go @@ -87,6 +87,7 @@ type Stream struct { rangeCh chan keyRange kvChan chan *pb.KVList nextStreamId uint32 + doneMarkers bool // Use allocators to generate KVs. allocatorsMu sync.RWMutex @@ -99,6 +100,11 @@ func (st *Stream) Allocator(threadId int) *z.Allocator { return st.allocators[threadId] } +// SendDoneMarkers when true would send out done markers on the stream. False by default. +func (st *Stream) SendDoneMarkers(done bool) { + st.doneMarkers = done +} + // ToList is a default implementation of KeyToList. It picks up all valid versions of the key, // skipping over deleted or expired keys. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { @@ -268,11 +274,16 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { } } // Mark the stream as done. - outList.Kv = append(outList.Kv, &pb.KV{ - StreamId: streamId, - StreamDone: true, - }) - return sendIt() + if st.doneMarkers { + outList.Kv = append(outList.Kv, &pb.KV{ + StreamId: streamId, + StreamDone: true, + }) + } + if len(outList.Kv) > 0 { + return sendIt() + } + return nil } for {