From 55ca73949860f50aa08bd8582f1e80a42b2459be Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Tue, 1 Oct 2019 13:21:33 +0530 Subject: [PATCH 01/15] Add support for stream end in stream writer --- pb/pb.pb.go | 135 ++++++++++++++++++++++++++++++----------------- pb/pb.proto | 2 + stream_writer.go | 85 ++++++++++++++++++++++++----- 3 files changed, 161 insertions(+), 61 deletions(-) diff --git a/pb/pb.pb.go b/pb/pb.pb.go index bfff39b7b..5c9e6cdc5 100644 --- a/pb/pb.pb.go +++ b/pb/pb.pb.go @@ -8,6 +8,7 @@ import ( proto "github.com/golang/protobuf/proto" io "io" math "math" + math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used. @@ -101,7 +102,9 @@ type KV struct { ExpiresAt uint64 `protobuf:"varint,5,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` Meta []byte `protobuf:"bytes,6,opt,name=meta,proto3" json:"meta,omitempty"` // Stream id is used to identify which stream the KV came from. - StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` + StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` + // Stream done is to indicate end of stream. + StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -189,6 +192,13 @@ func (m *KV) GetStreamId() uint32 { return 0 } +func (m *KV) GetStreamDone() bool { + if m != nil { + return m.StreamDone + } + return false +} + type KVList struct { Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -632,46 +642,47 @@ func init() { func init() { proto.RegisterFile("pb.proto", fileDescriptor_f80abaa17e25ccc8) } var fileDescriptor_f80abaa17e25ccc8 = []byte{ - // 611 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xdd, 0x6e, 0x12, 0x41, - 0x14, 0x66, 0x16, 0xba, 0xc0, 0xa1, 0xa5, 0xeb, 0x44, 0x9b, 0x35, 0x2a, 0xc1, 0x35, 0x26, 0xd8, - 0x34, 0x5c, 0xb4, 0xc6, 0x1b, 0xaf, 0x28, 0xc5, 0x48, 0x68, 0x43, 0x32, 0x36, 0x4d, 0xe3, 0x0d, - 0x19, 0x76, 0x0f, 0x65, 0xb3, 0xbf, 0xd9, 0x19, 0x36, 0xe5, 0x4d, 0x7c, 0x0f, 0x5f, 0xc2, 0x4b, - 0x1f, 0xc1, 0xd4, 0x07, 0xd1, 0xcc, 0xec, 0xd2, 0x40, 0xf4, 0xee, 0x9c, 0xef, 0x3b, 0x73, 0xe6, - 0xcc, 0xf7, 0xcd, 0x81, 0x46, 0x3a, 0xef, 0xa7, 0x59, 0x22, 0x13, 0x6a, 0xa4, 0x73, 0xe7, 0x3b, - 0x01, 0x63, 0x72, 0x43, 0x2d, 0xa8, 0x06, 0xb8, 0xb6, 0x49, 0x97, 0xf4, 0xf6, 0x99, 0x0a, 0xe9, - 0x53, 0xd8, 0xcb, 0x79, 0xb8, 0x42, 0xdb, 0xd0, 0x58, 0x91, 0xd0, 0x17, 0xd0, 0x5c, 0x09, 0xcc, - 0x66, 0x11, 0x4a, 0x6e, 0x57, 0x35, 0xd3, 0x50, 0xc0, 0x15, 0x4a, 0x4e, 0x6d, 0xa8, 0xe7, 0x98, - 0x09, 0x3f, 0x89, 0xed, 0x5a, 0x97, 0xf4, 0x6a, 0x6c, 0x93, 0xd2, 0x57, 0x00, 0x78, 0x9f, 0xfa, - 0x19, 0x8a, 0x19, 0x97, 0xf6, 0x9e, 0x26, 0x9b, 0x25, 0x32, 0x90, 0x94, 0x42, 0x4d, 0x37, 0x34, - 0x75, 0x43, 0x1d, 0xab, 0x9b, 0x84, 0xcc, 0x90, 0x47, 0x33, 0xdf, 0xb3, 0xa1, 0x4b, 0x7a, 0x07, - 0xac, 0x51, 0x00, 0x63, 0xcf, 0xe9, 0x82, 0x39, 0xb9, 0xb9, 0xf4, 0x85, 0xa4, 0x47, 0x60, 0x04, - 0xb9, 0x4d, 0xba, 0xd5, 0x5e, 0xeb, 0xd4, 0xec, 0xa7, 0xf3, 0xfe, 0xe4, 0x86, 0x19, 0x41, 0xee, - 0x0c, 0xe0, 0xc9, 0x15, 0x8f, 0xfd, 0x05, 0x0a, 0x39, 0x5c, 0xf2, 0xf8, 0x0e, 0xbf, 0xa0, 0xa4, - 0x27, 0x50, 0x77, 0x75, 0x22, 0xca, 0x13, 0x54, 0x9d, 0xd8, 0xad, 0x63, 0x9b, 0x12, 0xe7, 0x0f, - 0x81, 0xf6, 0x2e, 0x47, 0xdb, 0x60, 0x8c, 0x3d, 0xad, 0x52, 0x8d, 0x19, 0x63, 0x8f, 0x9e, 0x80, - 0x31, 0x4d, 0xb5, 0x42, 0xed, 0xd3, 0x97, 0xff, 0xf6, 0xea, 0x4f, 0x53, 0xcc, 0xb8, 0xf4, 0x93, - 0x98, 0x19, 0xd3, 0x54, 0x49, 0x7a, 0x89, 0x39, 0x86, 0x5a, 0xb8, 0x03, 0x56, 0x24, 0xf4, 0x19, - 0x98, 0x01, 0xae, 0xd5, 0x2b, 0x0b, 0xd1, 0xf6, 0x02, 0x5c, 0x8f, 0x3d, 0xfa, 0x11, 0x0e, 0x31, - 0x76, 0xb3, 0x75, 0xaa, 0x8e, 0xcf, 0x78, 0x78, 0x97, 0x68, 0xdd, 0xda, 0xc5, 0xcc, 0xa3, 0x47, - 0x6a, 0x10, 0xde, 0x25, 0xac, 0x8d, 0x3b, 0x39, 0xed, 0x42, 0xcb, 0x4d, 0xa2, 0x34, 0x43, 0xa1, - 0xdd, 0x30, 0xf5, 0x7d, 0xdb, 0x90, 0xf3, 0x06, 0x9a, 0x8f, 0xc3, 0x51, 0x00, 0x73, 0xc8, 0x46, - 0x83, 0xeb, 0x91, 0x55, 0x51, 0xf1, 0xc5, 0xe8, 0x72, 0x74, 0x3d, 0xb2, 0x88, 0x33, 0x86, 0xd6, - 0x79, 0x98, 0xb8, 0xc1, 0x74, 0xb1, 0x10, 0x28, 0xff, 0xf3, 0x49, 0x8e, 0xc0, 0x4c, 0x34, 0xa7, - 0x35, 0x38, 0x60, 0x65, 0xa6, 0x2a, 0x43, 0x8c, 0xcb, 0x77, 0xaa, 0xd0, 0xf9, 0x0a, 0x70, 0xcd, - 0xe7, 0x21, 0x8e, 0x63, 0x0f, 0xef, 0xe9, 0x3b, 0xa8, 0x17, 0x95, 0x1b, 0x23, 0x0e, 0xd5, 0xa3, - 0xb6, 0xee, 0x62, 0x1b, 0x9e, 0xbe, 0x86, 0xfd, 0x79, 0x98, 0x24, 0xd1, 0x6c, 0xe1, 0x87, 0x12, - 0xb3, 0xf2, 0x3b, 0xb6, 0x34, 0xf6, 0x49, 0x43, 0x4e, 0x02, 0x8d, 0xe1, 0x12, 0xdd, 0x40, 0xac, - 0x22, 0x7a, 0x0c, 0x35, 0xad, 0x15, 0xd1, 0x5a, 0x1d, 0xa9, 0xb6, 0x1b, 0xae, 0xaf, 0xa4, 0xc9, - 0x7c, 0xb9, 0x8c, 0x98, 0xae, 0x51, 0x53, 0x8a, 0x55, 0xa4, 0x3b, 0xd6, 0x98, 0x0a, 0x9d, 0xb7, - 0xd0, 0x7c, 0x2c, 0x2a, 0x54, 0x19, 0x9e, 0x9d, 0x0e, 0xad, 0x0a, 0xdd, 0x87, 0xc6, 0xed, 0xed, - 0x67, 0x2e, 0x96, 0x1f, 0xde, 0x5b, 0xc4, 0x71, 0xa1, 0x7e, 0xc1, 0x25, 0x9f, 0xe0, 0x7a, 0xcb, - 0x3d, 0xb2, 0xed, 0x1e, 0x85, 0x9a, 0xc7, 0x25, 0x2f, 0xa7, 0xd5, 0xb1, 0xfa, 0x3c, 0x7e, 0x5e, - 0x2e, 0x8d, 0xe1, 0xe7, 0x6a, 0x29, 0xdc, 0x0c, 0xb9, 0x44, 0x4f, 0x2d, 0x85, 0x32, 0xbf, 0xca, - 0x9a, 0x25, 0x32, 0x90, 0xc7, 0xcf, 0xa1, 0xbd, 0xeb, 0x32, 0xad, 0x43, 0x95, 0xa3, 0xb0, 0x2a, - 0xe7, 0xd6, 0x8f, 0x87, 0x0e, 0xf9, 0xf9, 0xd0, 0x21, 0xbf, 0x1e, 0x3a, 0xe4, 0xdb, 0xef, 0x4e, - 0x65, 0x6e, 0xea, 0x8d, 0x3e, 0xfb, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x2f, 0x21, 0x79, 0xdd, - 0x03, 0x00, 0x00, + // 631 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcf, 0x6e, 0xd3, 0x4e, + 0x10, 0xce, 0x3a, 0xa9, 0x93, 0x4c, 0xda, 0x34, 0xbf, 0xd5, 0x8f, 0xca, 0x08, 0x08, 0xc6, 0x08, + 0x29, 0x54, 0x55, 0x0e, 0x2d, 0xe2, 0xc2, 0x29, 0x4d, 0x83, 0x88, 0xd2, 0x2a, 0xd2, 0x52, 0x55, + 0x15, 0x97, 0x68, 0x63, 0x4f, 0x1a, 0xcb, 0x7f, 0xd6, 0xb2, 0x37, 0x56, 0xf3, 0x26, 0x3c, 0x12, + 0x47, 0x0e, 0x3c, 0x00, 0x2a, 0x0f, 0x02, 0xda, 0xb5, 0x53, 0x25, 0x82, 0xdb, 0xcc, 0xf7, 0xcd, + 0xce, 0xce, 0x7e, 0xdf, 0x0e, 0x34, 0x92, 0x79, 0x3f, 0x49, 0x85, 0x14, 0xd4, 0x48, 0xe6, 0xce, + 0x0f, 0x02, 0xc6, 0xe4, 0x86, 0x76, 0xa0, 0x1a, 0xe0, 0xda, 0x22, 0x36, 0xe9, 0xed, 0x33, 0x15, + 0xd2, 0xff, 0x61, 0x2f, 0xe7, 0xe1, 0x0a, 0x2d, 0x43, 0x63, 0x45, 0x42, 0x9f, 0x41, 0x73, 0x95, + 0x61, 0x3a, 0x8b, 0x50, 0x72, 0xab, 0xaa, 0x99, 0x86, 0x02, 0xae, 0x50, 0x72, 0x6a, 0x41, 0x3d, + 0xc7, 0x34, 0xf3, 0x45, 0x6c, 0xd5, 0x6c, 0xd2, 0xab, 0xb1, 0x4d, 0x4a, 0x5f, 0x00, 0xe0, 0x7d, + 0xe2, 0xa7, 0x98, 0xcd, 0xb8, 0xb4, 0xf6, 0x34, 0xd9, 0x2c, 0x91, 0x81, 0xa4, 0x14, 0x6a, 0xba, + 0xa1, 0xa9, 0x1b, 0xea, 0x58, 0xdd, 0x94, 0xc9, 0x14, 0x79, 0x34, 0xf3, 0x3d, 0x0b, 0x6c, 0xd2, + 0x3b, 0x60, 0x8d, 0x02, 0x18, 0x7b, 0xf4, 0x25, 0xb4, 0x4a, 0xd2, 0x13, 0x31, 0x5a, 0x2d, 0x9b, + 0xf4, 0x1a, 0x0c, 0x0a, 0xe8, 0x42, 0xc4, 0xe8, 0xd8, 0x60, 0x4e, 0x6e, 0x2e, 0xfd, 0x4c, 0xd2, + 0x23, 0x30, 0x82, 0xdc, 0x22, 0x76, 0xb5, 0xd7, 0x3a, 0x35, 0xfb, 0xc9, 0xbc, 0x3f, 0xb9, 0x61, + 0x46, 0x90, 0x3b, 0x03, 0xf8, 0xef, 0x8a, 0xc7, 0xfe, 0x02, 0x33, 0x39, 0x5c, 0xf2, 0xf8, 0x0e, + 0x3f, 0xa3, 0xa4, 0x27, 0x50, 0x77, 0x75, 0x92, 0x95, 0x27, 0xa8, 0x3a, 0xb1, 0x5b, 0xc7, 0x36, + 0x25, 0xce, 0x6f, 0x02, 0xed, 0x5d, 0x8e, 0xb6, 0xc1, 0x18, 0x7b, 0x5a, 0xc6, 0x1a, 0x33, 0xc6, + 0x1e, 0x3d, 0x01, 0x63, 0x9a, 0x68, 0x09, 0xdb, 0xa7, 0xcf, 0xff, 0xee, 0xd5, 0x9f, 0x26, 0x98, + 0x72, 0xe9, 0x8b, 0x98, 0x19, 0xd3, 0x44, 0x69, 0x7e, 0x89, 0x39, 0x86, 0x5a, 0xd9, 0x03, 0x56, + 0x24, 0xf4, 0x09, 0x98, 0x01, 0xae, 0x95, 0x0c, 0x85, 0xaa, 0x7b, 0x01, 0xae, 0xc7, 0x1e, 0xfd, + 0x00, 0x87, 0x18, 0xbb, 0xe9, 0x3a, 0x51, 0xc7, 0x67, 0x3c, 0xbc, 0x13, 0x5a, 0xd8, 0x76, 0x31, + 0xf3, 0xe8, 0x91, 0x1a, 0x84, 0x77, 0x82, 0xb5, 0x71, 0x27, 0xa7, 0x36, 0xb4, 0x5c, 0x11, 0x25, + 0x29, 0x66, 0xda, 0x2e, 0x53, 0xdf, 0xb7, 0x0d, 0x39, 0xaf, 0xa1, 0xf9, 0x38, 0x1c, 0x05, 0x30, + 0x87, 0x6c, 0x34, 0xb8, 0x1e, 0x75, 0x2a, 0x2a, 0xbe, 0x18, 0x5d, 0x8e, 0xae, 0x47, 0x1d, 0xe2, + 0x8c, 0xa1, 0x75, 0x1e, 0x0a, 0x37, 0x98, 0x2e, 0x16, 0x19, 0xca, 0x7f, 0xfc, 0xa2, 0x23, 0x30, + 0x85, 0xe6, 0xb4, 0x06, 0x07, 0xac, 0xcc, 0x54, 0x65, 0x88, 0x71, 0xf9, 0x4e, 0x15, 0x3a, 0x5f, + 0x00, 0xae, 0xf9, 0x3c, 0xc4, 0x71, 0xec, 0xe1, 0x3d, 0x7d, 0x0b, 0xf5, 0xa2, 0x72, 0x63, 0xc4, + 0xa1, 0x7a, 0xd4, 0xd6, 0x5d, 0x6c, 0xc3, 0xd3, 0x57, 0xb0, 0x3f, 0x0f, 0x85, 0x88, 0x66, 0x0b, + 0x3f, 0x94, 0x98, 0x96, 0xff, 0xb5, 0xa5, 0xb1, 0x8f, 0x1a, 0x72, 0x04, 0x34, 0x86, 0x4b, 0x74, + 0x83, 0x6c, 0x15, 0xd1, 0x63, 0xa8, 0x69, 0xad, 0x88, 0xd6, 0xea, 0x48, 0xb5, 0xdd, 0x70, 0x7d, + 0x25, 0x4d, 0xea, 0xcb, 0x65, 0xc4, 0x74, 0x8d, 0x9a, 0x32, 0x5b, 0x45, 0xba, 0x63, 0x8d, 0xa9, + 0xd0, 0x79, 0x03, 0xcd, 0xc7, 0xa2, 0x42, 0x95, 0xe1, 0xd9, 0xe9, 0xb0, 0x53, 0xa1, 0xfb, 0xd0, + 0xb8, 0xbd, 0xfd, 0xc4, 0xb3, 0xe5, 0xfb, 0x77, 0x1d, 0xe2, 0xb8, 0x50, 0xbf, 0xe0, 0x92, 0x4f, + 0x70, 0xbd, 0xe5, 0x1e, 0xd9, 0x76, 0x8f, 0x42, 0xcd, 0xe3, 0x92, 0x97, 0xd3, 0xea, 0x58, 0x7d, + 0x1e, 0x3f, 0x2f, 0xb7, 0xca, 0xf0, 0x73, 0xb5, 0x35, 0x6e, 0x8a, 0x5c, 0xa2, 0xa7, 0xb6, 0x46, + 0x99, 0x5f, 0x65, 0xcd, 0x12, 0x19, 0xc8, 0xe3, 0xa7, 0xd0, 0xde, 0x75, 0x99, 0xd6, 0xa1, 0xca, + 0x31, 0xeb, 0x54, 0xce, 0x3b, 0xdf, 0x1e, 0xba, 0xe4, 0xfb, 0x43, 0x97, 0xfc, 0x7c, 0xe8, 0x92, + 0xaf, 0xbf, 0xba, 0x95, 0xb9, 0xa9, 0x57, 0xfe, 0xec, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5d, + 0x5f, 0x6a, 0x47, 0xfe, 0x03, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -728,6 +739,16 @@ func (m *KV) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintPb(dAtA, i, uint64(m.StreamId)) } + if m.StreamDone { + dAtA[i] = 0x58 + i++ + if m.StreamDone { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1041,6 +1062,9 @@ func (m *KV) Size() (n int) { if m.StreamId != 0 { n += 1 + sovPb(uint64(m.StreamId)) } + if m.StreamDone { + n += 2 + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1202,14 +1226,7 @@ func (m *DataKey) Size() (n int) { } func sovPb(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break - } - } - return n + return (math_bits.Len64(x|1) + 6) / 7 } func sozPb(x uint64) (n int) { return sovPb(uint64((x << 1) ^ uint64((int64(x) >> 63)))) @@ -1436,6 +1453,26 @@ func (m *KV) Unmarshal(dAtA []byte) error { break } } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StreamDone", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.StreamDone = bool(v != 0) default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) diff --git a/pb/pb.proto b/pb/pb.proto index 1edb0edc4..4d91eec04 100644 --- a/pb/pb.proto +++ b/pb/pb.proto @@ -29,6 +29,8 @@ message KV { // Stream id is used to identify which stream the KV came from. uint32 stream_id = 10; + // Stream done is used to indicate end of stream. + bool stream_done = 11; } message KVList { diff --git a/stream_writer.go b/stream_writer.go index b3ab8f141..c0d2a7e80 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -29,6 +29,11 @@ import ( const headStreamId uint32 = math.MaxUint32 +var ( + // ErrStreamClosed is returned when a sent is performed on closed stream. + ErrStreamClosed = errors.New("stream is already closed") +) + // StreamWriter is used to write data coming from multiple streams. The streams must not have any // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is // capable of generating such an output. So, this StreamWriter can be used at the other end to build @@ -41,13 +46,14 @@ const headStreamId uint32 = math.MaxUint32 // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new // DBs. type StreamWriter struct { - writeLock sync.Mutex - db *DB - done func() - throttle *y.Throttle - maxVersion uint64 - writers map[uint32]*sortedWriter - closer *y.Closer + writeLock sync.Mutex + db *DB + done func() + throttle *y.Throttle + maxVersion uint64 + writers map[uint32]*sortedWriter + closedStreams map[uint32]bool + closers map[uint32]*y.Closer } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -61,7 +67,7 @@ func (db *DB) NewStreamWriter() *StreamWriter { // concurrent streams being processed. throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), - closer: y.NewCloser(0), + closers: make(map[uint32]*y.Closer), } } @@ -85,8 +91,23 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { if len(kvs.GetKv()) == 0 { return nil } + + closedStreams := make(map[uint32]bool) + streamWithRecords := make(map[uint32]bool) streamReqs := make(map[uint32]*request) + for _, kv := range kvs.Kv { + if kv.StreamDone { + closedStreams[kv.StreamId] = true + continue + } + + if _, ok := closedStreams[kv.StreamId]; ok { + return ErrStreamClosed + } + + streamWithRecords[kv.StreamId] = true + var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -121,6 +142,14 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { sw.writeLock.Lock() defer sw.writeLock.Unlock() + + // Check if any of the stream we got records for are closed. + for streamID := range streamWithRecords { + if _, ok := sw.closedStreams[streamID]; ok { + return ErrStreamClosed + } + } + if err := sw.db.vlog.write(all); err != nil { return err } @@ -137,6 +166,22 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { } writer.reqCh <- req } + + // close any streams if required. + for streamID := range closedStreams { + writer, ok := sw.writers[streamID] + if !ok { + return nil + } + + closer := sw.closers[streamID] + closer.SignalAndWait() + if err := writer.Done(); err != nil { + return err + } + + sw.closedStreams[streamID] = true + } return nil } @@ -148,7 +193,10 @@ func (sw *StreamWriter) Flush() error { defer sw.done() - sw.closer.SignalAndWait() + for _, closer := range sw.closers { + closer.SignalAndWait() + } + var maxHead valuePointer for _, writer := range sw.writers { if err := writer.Done(); err != nil { @@ -228,8 +276,10 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), } - sw.closer.AddRunning(1) - go w.handleRequests(sw.closer) + + closer := y.NewCloser(1) + sw.closers[streamID] = closer + go w.handleRequests(closer) return w, nil } @@ -324,9 +374,20 @@ func (w *sortedWriter) send() error { // to sortedWriter. It completes writing current SST to disk. func (w *sortedWriter) Done() error { if w.builder.Empty() { + w.builder = nil return nil } - return w.send() + + if err := w.throttle.Do(); err != nil { + return err + } + go func(builder *table.Builder) { + err := w.createTable(builder) + w.throttle.Done(err) + }(w.builder) + + w.builder = nil + return nil } func (w *sortedWriter) createTable(builder *table.Builder) error { From 7d60e0fb93fb2d22124ebbbbf3520bb6b7b810f2 Mon Sep 17 00:00:00 2001 From: ashish Date: Tue, 1 Oct 2019 17:21:10 +0530 Subject: [PATCH 02/15] Fix some issues --- stream_writer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index c0d2a7e80..9dd1cf9a3 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -65,9 +65,10 @@ func (db *DB) NewStreamWriter() *StreamWriter { db: db, // throttle shouldn't make much difference. Memory consumption is based on the number of // concurrent streams being processed. - throttle: y.NewThrottle(16), - writers: make(map[uint32]*sortedWriter), - closers: make(map[uint32]*y.Closer), + throttle: y.NewThrottle(16), + writers: make(map[uint32]*sortedWriter), + closedStreams: make(map[uint32]bool), + closers: make(map[uint32]*y.Closer), } } @@ -181,6 +182,8 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { } sw.closedStreams[streamID] = true + delete(sw.writers, streamID) + delete(sw.closers, streamID) } return nil } From 70335b7534456598272663378c6419e329454726 Mon Sep 17 00:00:00 2001 From: ashish Date: Tue, 1 Oct 2019 17:51:52 +0530 Subject: [PATCH 03/15] Linting fixes --- stream_writer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index 9dd1cf9a3..2894bea77 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -27,7 +27,7 @@ import ( "github.com/pkg/errors" ) -const headStreamId uint32 = math.MaxUint32 +const headStreamID uint32 = math.MaxUint32 var ( // ErrStreamClosed is returned when a sent is performed on closed stream. @@ -126,7 +126,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { ExpiresAt: kv.ExpiresAt, meta: meta, } - // If the value can be colocated with the key in LSM tree, we can skip + // If the value can be collocated with the key in LSM tree, we can skip // writing the value to value log. e.skipVlog = sw.db.shouldWriteValueToLSM(*e) req := streamReqs[kv.StreamId] @@ -212,7 +212,7 @@ func (sw *StreamWriter) Flush() error { // Encode and write the value log head into a new table. data := maxHead.Encode() - headWriter, err := sw.newWriter(headStreamId) + headWriter, err := sw.newWriter(headStreamID) if err != nil { return errors.Wrap(err, "failed to create head writer") } @@ -431,7 +431,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { // better than that. lhandler = lc.levels[len(lc.levels)-1] } - if w.streamID == headStreamId { + if w.streamID == headStreamID { // This is a special !badger!head key. We should store it at level 0, separate from all the // other keys to avoid an overlap. lhandler = lc.levels[0] From eaadf99d2e39a0a6916bee19e053a072e8cb4f4c Mon Sep 17 00:00:00 2001 From: ashish Date: Thu, 3 Oct 2019 14:54:45 +0530 Subject: [PATCH 04/15] Add tests for Stream Done --- stream_writer_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/stream_writer_test.go b/stream_writer_test.go index f361f4b1b..e8b9e750a 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -328,3 +328,71 @@ func TestStreamWriter6(t *testing.T) { require.NoError(t, db.Close()) }) } + +func TestStreamDone(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + for i := 0; i < 10; i++ { + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", i)), + Value: val[:], + Version: 1, + StreamId: uint32(i), + } + kv2 := &pb.KV{ + StreamId: uint32(i), + StreamDone: true, + } + list.Kv = append(list.Kv, kv1, kv2) + require.NoError(t, sw.Write(list), "sw.Write() failed") + } + require.NoError(t, sw.Flush(), "sw.Flush() failed") + require.NoError(t, db.Close()) + + var err error + db, err = Open(db.opt) + require.NoError(t, err) + require.NoError(t, db.Close()) + }) +} + +func TestSendOnClosedStream(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 1)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + kv2 := &pb.KV{ + StreamId: uint32(1), + StreamDone: true, + } + list.Kv = append(list.Kv, kv1, kv2) + require.NoError(t, sw.Write(list), "sw.Write() failed") + + // Send once stream is closed. + list = &pb.KVList{} + kv1 = &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 2)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + list.Kv = append(list.Kv, kv1) + require.Equal(t, ErrStreamClosed, sw.Write(list), "sw.Write() should fail") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + require.NoError(t, db.Close()) + }) +} From 73d78bf103725a64a879f36e54398d5627fe9013 Mon Sep 17 00:00:00 2001 From: ashish Date: Thu, 3 Oct 2019 14:55:16 +0530 Subject: [PATCH 05/15] Move closers inside sorted writer --- stream_writer.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index 2894bea77..95f82fee6 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -53,7 +53,6 @@ type StreamWriter struct { maxVersion uint64 writers map[uint32]*sortedWriter closedStreams map[uint32]bool - closers map[uint32]*y.Closer } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -68,7 +67,6 @@ func (db *DB) NewStreamWriter() *StreamWriter { throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), closedStreams: make(map[uint32]bool), - closers: make(map[uint32]*y.Closer), } } @@ -175,15 +173,13 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { return nil } - closer := sw.closers[streamID] - closer.SignalAndWait() + writer.closer.SignalAndWait() if err := writer.Done(); err != nil { return err } sw.closedStreams[streamID] = true delete(sw.writers, streamID) - delete(sw.closers, streamID) } return nil } @@ -196,8 +192,8 @@ func (sw *StreamWriter) Flush() error { defer sw.done() - for _, closer := range sw.closers { - closer.SignalAndWait() + for _, writer := range sw.writers { + writer.closer.SignalAndWait() } var maxHead valuePointer @@ -262,6 +258,7 @@ type sortedWriter struct { streamID uint32 reqCh chan *request head valuePointer + closer *y.Closer } func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { @@ -270,6 +267,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { return nil, err } + closer := y.NewCloser(1) bopts := buildTableOptions(sw.db.opt) bopts.DataKey = dk w := &sortedWriter{ @@ -278,19 +276,18 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { throttle: sw.throttle, builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), + closer: closer, } - closer := y.NewCloser(1) - sw.closers[streamID] = closer - go w.handleRequests(closer) + go w.handleRequests() return w, nil } // ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add. var ErrUnsortedKey = errors.New("Keys not in sorted order") -func (w *sortedWriter) handleRequests(closer *y.Closer) { - defer closer.Done() +func (w *sortedWriter) handleRequests() { + defer w.closer.Done() process := func(req *request) { for i, e := range req.Entries { @@ -326,7 +323,7 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) { select { case req := <-w.reqCh: process(req) - case <-closer.HasBeenClosed(): + case <-w.closer.HasBeenClosed(): close(w.reqCh) for req := range w.reqCh { process(req) From e50743cc55cd5660e83127e896ec1b961a357cb6 Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Fri, 4 Oct 2019 16:08:57 +0530 Subject: [PATCH 06/15] Address review comments --- stream_writer.go | 106 +++++++++++++++++++------------------- stream_writer_test.go | 115 ++++++++++++++++++++++++++++++------------ 2 files changed, 138 insertions(+), 83 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index 95f82fee6..5a0301f4f 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -17,6 +17,7 @@ package badger import ( + "fmt" "math" "sync" @@ -27,12 +28,7 @@ import ( "github.com/pkg/errors" ) -const headStreamID uint32 = math.MaxUint32 - -var ( - // ErrStreamClosed is returned when a sent is performed on closed stream. - ErrStreamClosed = errors.New("stream is already closed") -) +const headStreamId uint32 = math.MaxUint32 // StreamWriter is used to write data coming from multiple streams. The streams must not have any // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is @@ -46,13 +42,12 @@ var ( // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new // DBs. type StreamWriter struct { - writeLock sync.Mutex - db *DB - done func() - throttle *y.Throttle - maxVersion uint64 - writers map[uint32]*sortedWriter - closedStreams map[uint32]bool + writeLock sync.Mutex + db *DB + done func() + throttle *y.Throttle + maxVersion uint64 + writers map[uint32]*sortedWriter } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -64,9 +59,8 @@ func (db *DB) NewStreamWriter() *StreamWriter { db: db, // throttle shouldn't make much difference. Memory consumption is based on the number of // concurrent streams being processed. - throttle: y.NewThrottle(16), - writers: make(map[uint32]*sortedWriter), - closedStreams: make(map[uint32]bool), + throttle: y.NewThrottle(16), + writers: make(map[uint32]*sortedWriter), } } @@ -91,22 +85,22 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { return nil } - closedStreams := make(map[uint32]bool) - streamWithRecords := make(map[uint32]bool) + // closedStreams keeps track of all streams which are going to be marked as done. We are + // keeping track of all streams so that we can close them at the end, after inserting all + // the valid kvs. + closedStreams := make(map[uint32]struct{}) streamReqs := make(map[uint32]*request) - for _, kv := range kvs.Kv { if kv.StreamDone { - closedStreams[kv.StreamId] = true + closedStreams[kv.StreamId] = struct{}{} continue } + // Panic if some kv comes after stream has been marked as closed. if _, ok := closedStreams[kv.StreamId]; ok { - return ErrStreamClosed + panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) } - streamWithRecords[kv.StreamId] = true - var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -142,13 +136,9 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { sw.writeLock.Lock() defer sw.writeLock.Unlock() - // Check if any of the stream we got records for are closed. - for streamID := range streamWithRecords { - if _, ok := sw.closedStreams[streamID]; ok { - return ErrStreamClosed - } - } - + // We are writing all requests to vlog even if some request belongs to already closed stream. + // It is safe to do because we are panicing while writing to sorted writer, which will be nil + // for closed stream. At restart, stream writer will drop all the data in Prepare function. if err := sw.db.vlog.write(all); err != nil { return err } @@ -163,14 +153,22 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { } sw.writers[streamID] = writer } + + if writer == nil { + panic(fmt.Sprintf("write performed on closed stream: %d", streamID)) + } + writer.reqCh <- req } - // close any streams if required. - for streamID := range closedStreams { - writer, ok := sw.writers[streamID] + // Now we can close any streams if required. We will make writer for + // the closed streams as nil. + for streamId := range closedStreams { + writer, ok := sw.writers[streamId] if !ok { - return nil + sw.db.opt.Logger.Warningf("Trying to close stream: %d, but no sorted "+ + "writer found for it", streamId) + continue } writer.closer.SignalAndWait() @@ -178,8 +176,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { return err } - sw.closedStreams[streamID] = true - delete(sw.writers, streamID) + sw.writers[streamId] = nil } return nil } @@ -193,11 +190,16 @@ func (sw *StreamWriter) Flush() error { defer sw.done() for _, writer := range sw.writers { - writer.closer.SignalAndWait() + if writer != nil { + writer.closer.SignalAndWait() + } } var maxHead valuePointer for _, writer := range sw.writers { + if writer == nil { + continue + } if err := writer.Done(); err != nil { return err } @@ -208,7 +210,7 @@ func (sw *StreamWriter) Flush() error { // Encode and write the value log head into a new table. data := maxHead.Encode() - headWriter, err := sw.newWriter(headStreamID) + headWriter, err := sw.newWriter(headStreamId) if err != nil { return errors.Wrap(err, "failed to create head writer") } @@ -258,7 +260,8 @@ type sortedWriter struct { streamID uint32 reqCh chan *request head valuePointer - closer *y.Closer + // Have separate closer for each writer, as it can be closed at any time. + closer *y.Closer } func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { @@ -342,7 +345,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { sameKey := y.SameKey(key, w.lastKey) // Same keys should go into the same SSTable. if !sameKey && w.builder.ReachedCapacity(w.db.opt.MaxTableSize) { - if err := w.send(); err != nil { + if err := w.send(false); err != nil { return err } } @@ -352,7 +355,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { return nil } -func (w *sortedWriter) send() error { +func (w *sortedWriter) send(done bool) error { if err := w.throttle.Do(); err != nil { return err } @@ -360,6 +363,13 @@ func (w *sortedWriter) send() error { err := w.createTable(builder) w.throttle.Done(err) }(w.builder) + // If done is true, this indicates we can close the writer. + // No need to allocate underlying TableBuilder now. + if done { + w.builder = nil + return nil + } + dk, err := w.db.registry.latestDataKey() if err != nil { return y.Wrapf(err, "Error while retriving datakey in sortedWriter.send") @@ -374,20 +384,12 @@ func (w *sortedWriter) send() error { // to sortedWriter. It completes writing current SST to disk. func (w *sortedWriter) Done() error { if w.builder.Empty() { + // Assign builder as nil, so that underlying memory can be garbage collected. w.builder = nil return nil } - if err := w.throttle.Do(); err != nil { - return err - } - go func(builder *table.Builder) { - err := w.createTable(builder) - w.throttle.Done(err) - }(w.builder) - - w.builder = nil - return nil + return w.send(true) } func (w *sortedWriter) createTable(builder *table.Builder) error { @@ -428,7 +430,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { // better than that. lhandler = lc.levels[len(lc.levels)-1] } - if w.streamID == headStreamID { + if w.streamID == headStreamId { // This is a special !badger!head key. We should store it at level 0, separate from all the // other keys to avoid an overlap. lhandler = lc.levels[0] diff --git a/stream_writer_test.go b/stream_writer_test.go index e8b9e750a..276feaccd 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -20,8 +20,10 @@ import ( "bytes" "encoding/binary" "fmt" + "io/ioutil" "math" "math/rand" + "os" "testing" "github.com/stretchr/testify/require" @@ -362,37 +364,88 @@ func TestStreamDone(t *testing.T) { } func TestSendOnClosedStream(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - sw := db.NewStreamWriter() - require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + opts := getTestOptions(dir) + db, err := Open(opts) + require.NoError(t, err) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 1)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + kv2 := &pb.KV{ + StreamId: uint32(1), + StreamDone: true, + } + list.Kv = append(list.Kv, kv1, kv2) + require.NoError(t, sw.Write(list), "sw.Write() failed") + + // Defer for panic. + defer func() { + require.NotNil(t, recover(), "should have paniced") + }() + // Send once stream is closed. + list = &pb.KVList{} + kv1 = &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 2)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + list.Kv = append(list.Kv, kv1) + sw.Write(list) +} - var val [10]byte - rand.Read(val[:]) - list := &pb.KVList{} - kv1 := &pb.KV{ - Key: []byte(fmt.Sprintf("%d", 1)), - Value: val[:], - Version: 1, - StreamId: uint32(1), - } - kv2 := &pb.KV{ - StreamId: uint32(1), - StreamDone: true, - } - list.Kv = append(list.Kv, kv1, kv2) - require.NoError(t, sw.Write(list), "sw.Write() failed") +func TestSendOnClosedStream2(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + opts := getTestOptions(dir) + db, err := Open(opts) + require.NoError(t, err) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 1)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + kv2 := &pb.KV{ + StreamId: uint32(1), + StreamDone: true, + } + kv3 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 2)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + list.Kv = append(list.Kv, kv1, kv2, kv3) - // Send once stream is closed. - list = &pb.KVList{} - kv1 = &pb.KV{ - Key: []byte(fmt.Sprintf("%d", 2)), - Value: val[:], - Version: 1, - StreamId: uint32(1), - } - list.Kv = append(list.Kv, kv1) - require.Equal(t, ErrStreamClosed, sw.Write(list), "sw.Write() should fail") - require.NoError(t, sw.Flush(), "sw.Flush() failed") - require.NoError(t, db.Close()) - }) + // Defer for panic. + defer func() { + require.NotNil(t, recover(), "should have paniced") + }() + + require.NoError(t, sw.Write(list), "sw.Write() failed") } From f0381beca9f75a31c3443ddc0ff83559474dcf3b Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Fri, 4 Oct 2019 16:54:21 +0530 Subject: [PATCH 07/15] Fix AppVeyor tests --- stream_writer_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/stream_writer_test.go b/stream_writer_test.go index 276feaccd..332d42387 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -395,6 +395,8 @@ func TestSendOnClosedStream(t *testing.T) { // Defer for panic. defer func() { require.NotNil(t, recover(), "should have paniced") + require.NoError(t, sw.Flush()) + require.NoError(t, db.Close()) }() // Send once stream is closed. list = &pb.KVList{} @@ -445,6 +447,8 @@ func TestSendOnClosedStream2(t *testing.T) { // Defer for panic. defer func() { require.NotNil(t, recover(), "should have paniced") + require.NoError(t, sw.Flush()) + require.NoError(t, db.Close()) }() require.NoError(t, sw.Write(list), "sw.Write() failed") From 3badcac9af2969c6f9e0b6f246036411472d7cda Mon Sep 17 00:00:00 2001 From: ashish Date: Thu, 10 Oct 2019 21:38:55 +0530 Subject: [PATCH 08/15] Address review comments --- stream_writer.go | 5 ++--- stream_writer_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index 5a0301f4f..c1a02b645 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -137,7 +137,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { defer sw.writeLock.Unlock() // We are writing all requests to vlog even if some request belongs to already closed stream. - // It is safe to do because we are panicing while writing to sorted writer, which will be nil + // It is safe to do because we are panicking while writing to sorted writer, which will be nil // for closed stream. At restart, stream writer will drop all the data in Prepare function. if err := sw.db.vlog.write(all); err != nil { return err @@ -270,7 +270,6 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { return nil, err } - closer := y.NewCloser(1) bopts := buildTableOptions(sw.db.opt) bopts.DataKey = dk w := &sortedWriter{ @@ -279,7 +278,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { throttle: sw.throttle, builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), - closer: closer, + closer: y.NewCloser(1), } go w.handleRequests() diff --git a/stream_writer_test.go b/stream_writer_test.go index 332d42387..a499e525f 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -338,7 +338,7 @@ func TestStreamDone(t *testing.T) { var val [10]byte rand.Read(val[:]) - for i := 0; i < 10; i++ { + for i := 0; i < 1000; i++ { list := &pb.KVList{} kv1 := &pb.KV{ Key: []byte(fmt.Sprintf("%d", i)), From 5a1d18888dd73943dbbd49d7d77500a94159fc4b Mon Sep 17 00:00:00 2001 From: ashish Date: Wed, 16 Oct 2019 15:45:50 +0530 Subject: [PATCH 09/15] Add -v in travis go test --- contrib/cover.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/cover.sh b/contrib/cover.sh index 5e2c179a8..d94b162b6 100755 --- a/contrib/cover.sh +++ b/contrib/cover.sh @@ -6,14 +6,14 @@ TMP=$(mktemp /tmp/badger-coverage-XXXXX.txt) BUILD=$1 OUT=$2 -set -e +set -ex pushd $SRC &> /dev/null # create coverage output echo 'mode: atomic' > $OUT for PKG in $(go list ./...|grep -v -E 'vendor'); do - go test -covermode=atomic -coverprofile=$TMP $PKG + go test -v -covermode=atomic -coverprofile=$TMP $PKG tail -n +2 $TMP >> $OUT done From f065b7907b2015bd4a203756e930a103024613da Mon Sep 17 00:00:00 2001 From: ashish Date: Wed, 16 Oct 2019 16:17:03 +0530 Subject: [PATCH 10/15] Decrease no of stream to fix travis build --- stream_writer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream_writer_test.go b/stream_writer_test.go index a499e525f..332d42387 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -338,7 +338,7 @@ func TestStreamDone(t *testing.T) { var val [10]byte rand.Read(val[:]) - for i := 0; i < 1000; i++ { + for i := 0; i < 10; i++ { list := &pb.KVList{} kv1 := &pb.KV{ Key: []byte(fmt.Sprintf("%d", i)), From 6c209772112b1b4ecc0ab81c91170d141d2d1355 Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Thu, 17 Oct 2019 10:34:07 +0530 Subject: [PATCH 11/15] Fix maxhead while closing streams --- stream_writer.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/stream_writer.go b/stream_writer.go index c1a02b645..b59eb9bb3 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -48,6 +48,7 @@ type StreamWriter struct { throttle *y.Throttle maxVersion uint64 writers map[uint32]*sortedWriter + maxHead valuePointer } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -176,6 +177,10 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { return err } + if sw.maxHead.Less(writer.head) { + sw.maxHead = writer.head + } + sw.writers[streamId] = nil } return nil @@ -195,7 +200,6 @@ func (sw *StreamWriter) Flush() error { } } - var maxHead valuePointer for _, writer := range sw.writers { if writer == nil { continue @@ -203,13 +207,13 @@ func (sw *StreamWriter) Flush() error { if err := writer.Done(); err != nil { return err } - if maxHead.Less(writer.head) { - maxHead = writer.head + if sw.maxHead.Less(writer.head) { + sw.maxHead = writer.head } } // Encode and write the value log head into a new table. - data := maxHead.Encode() + data := sw.maxHead.Encode() headWriter, err := sw.newWriter(headStreamId) if err != nil { return errors.Wrap(err, "failed to create head writer") From 135834d6faf0f59775a2ebf782e4bc88c4e41d57 Mon Sep 17 00:00:00 2001 From: ashish Date: Thu, 17 Oct 2019 18:10:08 +0530 Subject: [PATCH 12/15] Add test for maxhead update in stream writer --- stream_writer_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/stream_writer_test.go b/stream_writer_test.go index 332d42387..27f4f0dea 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -25,6 +25,7 @@ import ( "math/rand" "os" "testing" + "time" "github.com/stretchr/testify/require" @@ -453,3 +454,60 @@ func TestSendOnClosedStream2(t *testing.T) { require.NoError(t, sw.Write(list), "sw.Write() failed") } + +// This test verifies if we have updated max head correctly after calling stream writer flush. +func TestMaxHeadUpdate(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + var val [50]byte + // Value size should be than opt.ValueThreshold. + require.Greater(t, len(val), db.opt.ValueThreshold) + rand.Read(val[:]) + for i := 0; i < 10; i++ { + list := &pb.KVList{} + kv := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", i)), + Value: val[:], + Version: 1, + StreamId: uint32(i), + } + list.Kv = append(list.Kv, kv) + require.NoError(t, sw.Write(list), "sw.Write() failed") + } + + // Wait for entries to be written by sortedWriter as reqs are pushed on buffered channel. + time.Sleep(10 * time.Millisecond) + + // Get max head from all sorted writers. + var maxHead valuePointer + for _, writer := range sw.writers { + if maxHead.Less(writer.head) { + maxHead = writer.head + } + } + + // Close only last five streams using done. We are closing streams with max vlog pointer. + for i := uint32(9); i >= 5; i-- { + list := &pb.KVList{} + kv := &pb.KV{ + StreamId: i, + StreamDone: true, + } + list.Kv = append(list.Kv, kv) + require.NoError(t, sw.Write(list), "sw.Write() failed") + } + require.NoError(t, sw.Flush(), "sw.Flush() failed") + + vs, err := db.get(y.KeyWithTs(head, math.MaxUint64)) // Get head from DB. + require.NoError(t, err, "unable to get head") + var vp valuePointer + vp.Decode(vs.Value) + require.Equal(t, vp, maxHead) + require.NoError(t, db.Close()) + + db, err = Open(db.opt) + require.NoError(t, err) + require.NoError(t, db.Close()) + }) +} From c1684fe40ab07b29804cc137d333e660a333918b Mon Sep 17 00:00:00 2001 From: ashish Date: Thu, 17 Oct 2019 18:40:39 +0530 Subject: [PATCH 13/15] Remove test as it is failing with race --- stream_writer_test.go | 58 ------------------------------------------- 1 file changed, 58 deletions(-) diff --git a/stream_writer_test.go b/stream_writer_test.go index 27f4f0dea..332d42387 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -25,7 +25,6 @@ import ( "math/rand" "os" "testing" - "time" "github.com/stretchr/testify/require" @@ -454,60 +453,3 @@ func TestSendOnClosedStream2(t *testing.T) { require.NoError(t, sw.Write(list), "sw.Write() failed") } - -// This test verifies if we have updated max head correctly after calling stream writer flush. -func TestMaxHeadUpdate(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - sw := db.NewStreamWriter() - require.NoError(t, sw.Prepare(), "sw.Prepare() failed") - var val [50]byte - // Value size should be than opt.ValueThreshold. - require.Greater(t, len(val), db.opt.ValueThreshold) - rand.Read(val[:]) - for i := 0; i < 10; i++ { - list := &pb.KVList{} - kv := &pb.KV{ - Key: []byte(fmt.Sprintf("%d", i)), - Value: val[:], - Version: 1, - StreamId: uint32(i), - } - list.Kv = append(list.Kv, kv) - require.NoError(t, sw.Write(list), "sw.Write() failed") - } - - // Wait for entries to be written by sortedWriter as reqs are pushed on buffered channel. - time.Sleep(10 * time.Millisecond) - - // Get max head from all sorted writers. - var maxHead valuePointer - for _, writer := range sw.writers { - if maxHead.Less(writer.head) { - maxHead = writer.head - } - } - - // Close only last five streams using done. We are closing streams with max vlog pointer. - for i := uint32(9); i >= 5; i-- { - list := &pb.KVList{} - kv := &pb.KV{ - StreamId: i, - StreamDone: true, - } - list.Kv = append(list.Kv, kv) - require.NoError(t, sw.Write(list), "sw.Write() failed") - } - require.NoError(t, sw.Flush(), "sw.Flush() failed") - - vs, err := db.get(y.KeyWithTs(head, math.MaxUint64)) // Get head from DB. - require.NoError(t, err, "unable to get head") - var vp valuePointer - vp.Decode(vs.Value) - require.Equal(t, vp, maxHead) - require.NoError(t, db.Close()) - - db, err = Open(db.opt) - require.NoError(t, err) - require.NoError(t, db.Close()) - }) -} From eda20f519f19c59a678776d635a5ea75a62ee696 Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Fri, 18 Oct 2019 13:11:28 +0530 Subject: [PATCH 14/15] Trigger license/cla From ffb416a764121b836a34756bf3742c4d5a13c67b Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Fri, 18 Oct 2019 18:30:36 +0530 Subject: [PATCH 15/15] Trigger license/cla2