diff --git a/go/cmd/vtgateclienttest/services/echo.go b/go/cmd/vtgateclienttest/services/echo.go index c12807e09b1..63102ce692e 100644 --- a/go/cmd/vtgateclienttest/services/echo.go +++ b/go/cmd/vtgateclienttest/services/echo.go @@ -143,7 +143,7 @@ func (c *echoClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session return c.fallbackClient.ExecuteBatch(ctx, session, sqlList, bindVariablesList) } -func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, callback func([]*binlogdatapb.VEvent) error) error { +func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, callback func([]*binlogdatapb.VEvent) error) error { if strings.HasPrefix(vgtid.ShardGtids[0].Shard, EchoPrefix) { _ = callback([]*binlogdatapb.VEvent{ { @@ -170,5 +170,5 @@ func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletTy return nil } - return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, callback) + return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, flags, callback) } diff --git a/go/cmd/vtgateclienttest/services/fallback.go b/go/cmd/vtgateclienttest/services/fallback.go index b94ca031106..5119fa36587 100644 --- a/go/cmd/vtgateclienttest/services/fallback.go +++ b/go/cmd/vtgateclienttest/services/fallback.go @@ -56,8 +56,8 @@ func (c fallbackClient) ResolveTransaction(ctx context.Context, dtid string) err return c.fallback.ResolveTransaction(ctx, dtid) } -func (c fallbackClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.fallback.VStream(ctx, tabletType, vgtid, filter, send) +func (c fallbackClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { + return c.fallback.VStream(ctx, tabletType, vgtid, filter, flags, send) } func (c fallbackClient) HandlePanic(err *error) { diff --git a/go/cmd/vtgateclienttest/services/terminal.go b/go/cmd/vtgateclienttest/services/terminal.go index 3f4a4a7d30b..6a8e30fd9da 100644 --- a/go/cmd/vtgateclienttest/services/terminal.go +++ b/go/cmd/vtgateclienttest/services/terminal.go @@ -66,7 +66,7 @@ func (c *terminalClient) ResolveTransaction(ctx context.Context, dtid string) er return errTerminal } -func (c *terminalClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (c *terminalClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { return errTerminal } diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index 1a330abb10e..d265e098d5b 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -1071,6 +1071,53 @@ func (m *ResolveTransactionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ResolveTransactionResponse proto.InternalMessageInfo +type VStreamFlags struct { + MinimizeSkew bool `protobuf:"varint,1,opt,name=minimize_skew,json=minimizeSkew,proto3" json:"minimize_skew,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *VStreamFlags) Reset() { *m = VStreamFlags{} } +func (m *VStreamFlags) String() string { return proto.CompactTextString(m) } +func (*VStreamFlags) ProtoMessage() {} +func (*VStreamFlags) Descriptor() ([]byte, []int) { + return fileDescriptor_aab96496ceaf1ebb, []int{10} +} +func (m *VStreamFlags) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *VStreamFlags) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_VStreamFlags.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *VStreamFlags) XXX_Merge(src proto.Message) { + xxx_messageInfo_VStreamFlags.Merge(m, src) +} +func (m *VStreamFlags) XXX_Size() int { + return m.Size() +} +func (m *VStreamFlags) XXX_DiscardUnknown() { + xxx_messageInfo_VStreamFlags.DiscardUnknown(m) +} + +var xxx_messageInfo_VStreamFlags proto.InternalMessageInfo + +func (m *VStreamFlags) GetMinimizeSkew() bool { + if m != nil { + return m.MinimizeSkew + } + return false +} + // VStreamRequest is the payload for VStream. type VStreamRequest struct { CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id,json=callerId,proto3" json:"caller_id,omitempty"` @@ -1080,6 +1127,7 @@ type VStreamRequest struct { // position is of the form 'ks1:0@MySQL56/|ks2:-80@MySQL56/'. Vgtid *binlogdata.VGtid `protobuf:"bytes,3,opt,name=vgtid,proto3" json:"vgtid,omitempty"` Filter *binlogdata.Filter `protobuf:"bytes,4,opt,name=filter,proto3" json:"filter,omitempty"` + Flags *VStreamFlags `protobuf:"bytes,5,opt,name=flags,proto3" json:"flags,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -1089,7 +1137,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} } func (m *VStreamRequest) String() string { return proto.CompactTextString(m) } func (*VStreamRequest) ProtoMessage() {} func (*VStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_aab96496ceaf1ebb, []int{10} + return fileDescriptor_aab96496ceaf1ebb, []int{11} } func (m *VStreamRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1146,6 +1194,13 @@ func (m *VStreamRequest) GetFilter() *binlogdata.Filter { return nil } +func (m *VStreamRequest) GetFlags() *VStreamFlags { + if m != nil { + return m.Flags + } + return nil +} + // VStreamResponse is streamed by VStream. type VStreamResponse struct { Events []*binlogdata.VEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` @@ -1158,7 +1213,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} } func (m *VStreamResponse) String() string { return proto.CompactTextString(m) } func (*VStreamResponse) ProtoMessage() {} func (*VStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_aab96496ceaf1ebb, []int{11} + return fileDescriptor_aab96496ceaf1ebb, []int{12} } func (m *VStreamResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1210,6 +1265,7 @@ func init() { proto.RegisterType((*StreamExecuteResponse)(nil), "vtgate.StreamExecuteResponse") proto.RegisterType((*ResolveTransactionRequest)(nil), "vtgate.ResolveTransactionRequest") proto.RegisterType((*ResolveTransactionResponse)(nil), "vtgate.ResolveTransactionResponse") + proto.RegisterType((*VStreamFlags)(nil), "vtgate.VStreamFlags") proto.RegisterType((*VStreamRequest)(nil), "vtgate.VStreamRequest") proto.RegisterType((*VStreamResponse)(nil), "vtgate.VStreamResponse") } @@ -1217,95 +1273,98 @@ func init() { func init() { proto.RegisterFile("vtgate.proto", fileDescriptor_aab96496ceaf1ebb) } var fileDescriptor_aab96496ceaf1ebb = []byte{ - // 1406 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xdd, 0x6e, 0x1b, 0x37, - 0x16, 0xce, 0xe8, 0x5f, 0x47, 0x7f, 0x63, 0x5a, 0x76, 0x26, 0xde, 0xac, 0x57, 0x50, 0x12, 0x44, - 0xc9, 0x2e, 0xec, 0x5d, 0x6f, 0x8b, 0x06, 0x45, 0x8b, 0xd6, 0x96, 0x9d, 0x54, 0x81, 0x1d, 0xb9, - 0x94, 0x6c, 0x03, 0x45, 0x8b, 0xc1, 0x58, 0x43, 0xcb, 0x84, 0xe5, 0xa1, 0x42, 0x52, 0x72, 0xf5, - 0x14, 0xbd, 0x2d, 0xfa, 0x02, 0xbd, 0xe9, 0x7d, 0x5f, 0xa1, 0xe8, 0x55, 0xf3, 0x06, 0x45, 0xfa, - 0x22, 0x05, 0xc9, 0x19, 0x79, 0xa4, 0xb8, 0x8d, 0x93, 0x20, 0x37, 0xc2, 0xf0, 0x7c, 0x87, 0x87, - 0x87, 0xe7, 0x3b, 0x3f, 0x14, 0x14, 0xc7, 0xb2, 0xef, 0x49, 0xb2, 0x36, 0xe4, 0x4c, 0x32, 0x94, - 0x31, 0xab, 0x15, 0xfb, 0x98, 0x06, 0x03, 0xd6, 0xf7, 0x3d, 0xe9, 0x19, 0x64, 0xa5, 0xf0, 0x7c, - 0x44, 0xf8, 0x24, 0x5c, 0x94, 0x25, 0x1b, 0xb2, 0x38, 0x38, 0x96, 0x7c, 0xd8, 0x33, 0x8b, 0xfa, - 0x8b, 0x02, 0x64, 0x3b, 0x44, 0x08, 0xca, 0x02, 0x74, 0x0f, 0xca, 0x34, 0x70, 0x25, 0xf7, 0x02, - 0xe1, 0xf5, 0x24, 0x65, 0x81, 0x63, 0xd5, 0xac, 0x46, 0x0e, 0x97, 0x68, 0xd0, 0xbd, 0x14, 0xa2, - 0x26, 0x94, 0xc5, 0xa9, 0xc7, 0x7d, 0x57, 0x98, 0x7d, 0xc2, 0x49, 0xd4, 0x92, 0x8d, 0xc2, 0xc6, - 0xed, 0xb5, 0xd0, 0xbb, 0xd0, 0xde, 0x5a, 0x47, 0x69, 0x85, 0x0b, 0x5c, 0x12, 0xb1, 0x95, 0x40, - 0xab, 0x00, 0xde, 0x48, 0xb2, 0x1e, 0x3b, 0x3f, 0xa7, 0xd2, 0x49, 0xe9, 0x73, 0x62, 0x12, 0x74, - 0x07, 0x4a, 0xd2, 0xe3, 0x7d, 0x22, 0x5d, 0x21, 0x39, 0x0d, 0xfa, 0x4e, 0xba, 0x66, 0x35, 0xf2, - 0xb8, 0x68, 0x84, 0x1d, 0x2d, 0x43, 0xeb, 0x90, 0x65, 0x43, 0xa9, 0x5d, 0xc8, 0xd4, 0xac, 0x46, - 0x61, 0x63, 0x69, 0xcd, 0x5c, 0x7c, 0xe7, 0x5b, 0xd2, 0x1b, 0x49, 0xd2, 0x36, 0x20, 0x8e, 0xb4, - 0xd0, 0x16, 0xd8, 0xb1, 0xeb, 0xb9, 0xe7, 0xcc, 0x27, 0x4e, 0xb6, 0x66, 0x35, 0xca, 0x1b, 0x37, - 0x23, 0xe7, 0x63, 0x37, 0xdd, 0x63, 0x3e, 0xc1, 0x15, 0x39, 0x2b, 0x40, 0xeb, 0x90, 0xbb, 0xf0, - 0x78, 0x40, 0x83, 0xbe, 0x70, 0x72, 0xfa, 0xe2, 0x8b, 0xe1, 0xa9, 0x5f, 0xaa, 0xdf, 0x23, 0x83, - 0xe1, 0xa9, 0x12, 0xfa, 0x0c, 0x8a, 0x43, 0x4e, 0x2e, 0xa3, 0x95, 0xbf, 0x46, 0xb4, 0x0a, 0x43, - 0x4e, 0xa6, 0xb1, 0xda, 0x84, 0xd2, 0x90, 0x09, 0x79, 0x69, 0x01, 0xae, 0x61, 0xa1, 0xa8, 0xb6, - 0x4c, 0x4d, 0xdc, 0x85, 0xf2, 0xc0, 0x13, 0xd2, 0xa5, 0x81, 0x20, 0x5c, 0xba, 0xd4, 0x77, 0x0a, - 0x35, 0xab, 0x91, 0xc2, 0x45, 0x25, 0x6d, 0x69, 0x61, 0xcb, 0x47, 0xff, 0x04, 0x38, 0x61, 0xa3, - 0xc0, 0x77, 0x39, 0xbb, 0x10, 0x4e, 0x51, 0x6b, 0xe4, 0xb5, 0x04, 0xb3, 0x0b, 0x81, 0x5c, 0x58, - 0x1e, 0x09, 0xc2, 0x5d, 0x9f, 0x9c, 0xd0, 0x80, 0xf8, 0xee, 0xd8, 0xe3, 0xd4, 0x3b, 0x1e, 0x10, - 0xe1, 0x94, 0xb4, 0x43, 0x0f, 0xe6, 0x1d, 0x3a, 0x10, 0x84, 0x6f, 0x1b, 0xe5, 0xc3, 0x48, 0x77, - 0x27, 0x90, 0x7c, 0x82, 0xab, 0xa3, 0x2b, 0x20, 0xd4, 0x06, 0x5b, 0x4c, 0x84, 0x24, 0xe7, 0x31, - 0xd3, 0x65, 0x6d, 0xfa, 0xee, 0x2b, 0x77, 0xd5, 0x7a, 0x73, 0x56, 0x2b, 0x62, 0x56, 0x8a, 0xfe, - 0x01, 0x79, 0xce, 0x2e, 0xdc, 0x1e, 0x1b, 0x05, 0xd2, 0xa9, 0xd4, 0xac, 0x46, 0x12, 0xe7, 0x38, - 0xbb, 0x68, 0xaa, 0xb5, 0x4a, 0x41, 0xe1, 0x8d, 0xc9, 0x90, 0xd1, 0x40, 0x0a, 0xc7, 0xae, 0x25, - 0x1b, 0x79, 0x1c, 0x93, 0xa0, 0x06, 0xd8, 0x34, 0x70, 0x39, 0x11, 0x84, 0x8f, 0x89, 0xef, 0xf6, - 0x58, 0x10, 0x38, 0x0b, 0x3a, 0x51, 0xcb, 0x34, 0xc0, 0xa1, 0xb8, 0xc9, 0x82, 0x40, 0x31, 0x3c, - 0x60, 0xbd, 0xb3, 0x88, 0x20, 0x07, 0xe9, 0x64, 0x7c, 0x0d, 0xc3, 0x6a, 0x47, 0x54, 0x79, 0x6b, - 0xb0, 0xa8, 0xe9, 0xd1, 0x56, 0x4e, 0x89, 0xc7, 0xe5, 0x31, 0xf1, 0xa4, 0xb3, 0xa8, 0x3d, 0x5e, - 0x50, 0xd0, 0x2e, 0xeb, 0x9d, 0x7d, 0x11, 0x01, 0xe8, 0x73, 0xb0, 0x39, 0xf1, 0x7c, 0xd7, 0x3b, - 0x91, 0x84, 0xbb, 0x17, 0x9c, 0x4a, 0xe2, 0x54, 0xf5, 0xa1, 0xcb, 0xd1, 0xa1, 0x98, 0x78, 0xfe, - 0xa6, 0x82, 0x8f, 0x14, 0x8a, 0xcb, 0x7c, 0x66, 0x8d, 0x6a, 0x50, 0xd8, 0xde, 0xde, 0xed, 0x48, - 0xee, 0x49, 0xd2, 0x9f, 0x38, 0x4b, 0xba, 0xba, 0xe2, 0x22, 0xa5, 0x11, 0xba, 0x77, 0x70, 0xd0, - 0xda, 0x76, 0x96, 0x8d, 0x46, 0x4c, 0x84, 0x3e, 0x80, 0x65, 0x12, 0xa8, 0x40, 0xbb, 0x21, 0x6b, - 0x82, 0x48, 0xa9, 0xeb, 0xe2, 0xa6, 0x0e, 0x53, 0xd5, 0xa0, 0x86, 0xaa, 0x4e, 0x88, 0xad, 0xfc, - 0x6c, 0x41, 0x31, 0x1e, 0x09, 0x74, 0x0f, 0x32, 0xa6, 0xaa, 0x75, 0xbb, 0x29, 0x6c, 0x94, 0xc2, - 0x72, 0xea, 0x6a, 0x21, 0x0e, 0x41, 0xd5, 0x9d, 0xe2, 0xb5, 0x4b, 0x7d, 0x27, 0xa1, 0xc3, 0x53, - 0x8a, 0x49, 0x5b, 0x3e, 0x7a, 0x04, 0x45, 0xa9, 0x4e, 0x95, 0xae, 0x37, 0xa0, 0x9e, 0x70, 0x92, - 0x61, 0x63, 0x98, 0x36, 0xc1, 0xae, 0x46, 0x37, 0x15, 0x88, 0x0b, 0xf2, 0x72, 0x81, 0xfe, 0x05, - 0x85, 0x29, 0xd9, 0xd4, 0xd7, 0x3d, 0x29, 0x89, 0x21, 0x12, 0xb5, 0xfc, 0x95, 0xaf, 0xe1, 0xd6, - 0x5f, 0x66, 0x34, 0xb2, 0x21, 0x79, 0x46, 0x26, 0xfa, 0x0a, 0x79, 0xac, 0x3e, 0xd1, 0x03, 0x48, - 0x8f, 0xbd, 0xc1, 0x88, 0x68, 0x3f, 0x2f, 0xbb, 0xc4, 0x16, 0x0d, 0xa6, 0x7b, 0xb1, 0xd1, 0xf8, - 0x38, 0xf1, 0xc8, 0x5a, 0xd9, 0x82, 0xea, 0x55, 0x49, 0x7d, 0x85, 0xe1, 0x6a, 0xdc, 0x70, 0x3e, - 0x66, 0xe3, 0x69, 0x2a, 0x97, 0xb4, 0x53, 0xf5, 0x9f, 0x2c, 0x28, 0xcf, 0xd2, 0x8f, 0xfe, 0x07, - 0x4b, 0xf3, 0x09, 0xe3, 0xf6, 0x25, 0xf5, 0x43, 0xb3, 0x68, 0x36, 0x3b, 0x9e, 0x48, 0xea, 0xa3, - 0x8f, 0xc0, 0x79, 0x65, 0x8b, 0xa4, 0xe7, 0x84, 0x8d, 0xa4, 0x3e, 0xd8, 0xc2, 0x4b, 0xb3, 0xbb, - 0xba, 0x06, 0x54, 0xc9, 0x1c, 0x16, 0x82, 0x9a, 0x25, 0xbd, 0x33, 0x7d, 0x90, 0x21, 0x22, 0x87, - 0x17, 0x42, 0xa8, 0xab, 0x10, 0x75, 0x8e, 0xa8, 0xff, 0x98, 0x80, 0x72, 0xd8, 0xb0, 0x31, 0x79, - 0x3e, 0x22, 0x42, 0xa2, 0xff, 0x40, 0xbe, 0xe7, 0x0d, 0x06, 0x84, 0xbb, 0xa1, 0x8b, 0x85, 0x8d, - 0xca, 0x9a, 0x19, 0x5b, 0x4d, 0x2d, 0x6f, 0x6d, 0xe3, 0x9c, 0xd1, 0x68, 0xf9, 0xe8, 0x01, 0x64, - 0xa3, 0xca, 0x4b, 0x4c, 0x75, 0xe3, 0x95, 0x87, 0x23, 0x1c, 0xdd, 0x87, 0xb4, 0x66, 0x21, 0x4c, - 0x8b, 0x85, 0x88, 0x13, 0xd5, 0xe3, 0x74, 0xfb, 0xc6, 0x06, 0x47, 0x1f, 0x42, 0x98, 0x1b, 0xae, - 0x9c, 0x0c, 0x89, 0x4e, 0x86, 0xf2, 0x46, 0x75, 0x3e, 0x8b, 0xba, 0x93, 0x21, 0xc1, 0x20, 0xa7, - 0xdf, 0x2a, 0x49, 0xcf, 0xc8, 0x44, 0x0c, 0xbd, 0x1e, 0x71, 0xf5, 0xc0, 0xd3, 0x83, 0x29, 0x8f, - 0x4b, 0x91, 0x54, 0x67, 0x7e, 0x7c, 0x70, 0x65, 0xaf, 0x33, 0xb8, 0x9e, 0xa6, 0x72, 0x69, 0x3b, - 0x53, 0xff, 0xce, 0x82, 0xca, 0x34, 0x52, 0x62, 0xc8, 0x02, 0xa1, 0x4e, 0x4c, 0x13, 0xce, 0x19, - 0x9f, 0x0b, 0x13, 0xde, 0x6f, 0xee, 0x28, 0x31, 0x36, 0xe8, 0x9b, 0xc4, 0xe8, 0x21, 0x64, 0x38, - 0x11, 0xa3, 0x81, 0x0c, 0x83, 0x84, 0xe2, 0xe3, 0x0d, 0x6b, 0x04, 0x87, 0x1a, 0xf5, 0x17, 0x09, - 0x58, 0x0c, 0x3d, 0xda, 0xf2, 0x64, 0xef, 0xf4, 0xbd, 0x13, 0xf8, 0x6f, 0xc8, 0x2a, 0x6f, 0x28, - 0x51, 0x09, 0x95, 0xbc, 0x9a, 0xc2, 0x48, 0xe3, 0x1d, 0x48, 0xf4, 0xc4, 0xcc, 0x3b, 0x28, 0x6d, - 0xde, 0x41, 0x9e, 0x88, 0xbf, 0x83, 0xde, 0x13, 0xd7, 0xf5, 0x1f, 0x2c, 0xa8, 0xce, 0xc6, 0xf4, - 0xbd, 0x51, 0xfd, 0x5f, 0xc8, 0x1a, 0x22, 0xa3, 0x68, 0x2e, 0x87, 0xbe, 0x19, 0x9a, 0x8f, 0xa8, - 0x3c, 0x35, 0xa6, 0x23, 0x35, 0x55, 0xac, 0xd5, 0x8e, 0xe4, 0xc4, 0x3b, 0x7f, 0xa7, 0x92, 0x9d, - 0xd6, 0x61, 0xe2, 0xcd, 0xea, 0x30, 0xf9, 0xd6, 0x75, 0x98, 0x7a, 0x0d, 0x37, 0xe9, 0x6b, 0x3d, - 0x20, 0x63, 0xb1, 0xcd, 0xfc, 0x7d, 0x6c, 0xeb, 0x4d, 0x58, 0x9a, 0x0b, 0x54, 0x48, 0xe3, 0x65, - 0x7d, 0x59, 0xaf, 0xad, 0xaf, 0x6f, 0xe0, 0x16, 0x26, 0x82, 0x0d, 0xc6, 0x24, 0x96, 0x79, 0x6f, - 0x17, 0x72, 0x04, 0x29, 0x5f, 0x86, 0x53, 0x33, 0x8f, 0xf5, 0x77, 0xfd, 0x36, 0xac, 0x5c, 0x65, - 0xde, 0x38, 0x5a, 0xff, 0xd5, 0x82, 0xf2, 0xa1, 0xb9, 0xc3, 0xdb, 0x1d, 0x39, 0x47, 0x5e, 0xe2, - 0x9a, 0xe4, 0xdd, 0x87, 0xf4, 0x58, 0x0f, 0xa7, 0xa8, 0x49, 0xc7, 0xfe, 0xdf, 0x1c, 0xaa, 0x99, - 0x81, 0x0d, 0xae, 0x22, 0x79, 0x42, 0x07, 0x92, 0x70, 0xcd, 0xae, 0x8a, 0x64, 0x4c, 0xf3, 0xb1, - 0x46, 0x70, 0xa8, 0x51, 0xff, 0x14, 0x2a, 0xd3, 0xbb, 0x5c, 0x12, 0x41, 0xc6, 0x44, 0x3d, 0xfe, - 0x2c, 0x9d, 0xfc, 0x33, 0xdb, 0x0f, 0x77, 0x14, 0x84, 0x43, 0x8d, 0x87, 0xdb, 0x50, 0x99, 0xfb, - 0x67, 0x80, 0x2a, 0x50, 0x38, 0x78, 0xd6, 0xd9, 0xdf, 0x69, 0xb6, 0x1e, 0xb7, 0x76, 0xb6, 0xed, - 0x1b, 0x08, 0x20, 0xd3, 0x69, 0x3d, 0x7b, 0xb2, 0xbb, 0x63, 0x5b, 0x28, 0x0f, 0xe9, 0xbd, 0x83, - 0xdd, 0x6e, 0xcb, 0x4e, 0xa8, 0xcf, 0xee, 0x51, 0x7b, 0xbf, 0x69, 0x27, 0x1f, 0x7e, 0x02, 0x85, - 0xa6, 0xfe, 0x7f, 0xd3, 0xe6, 0x3e, 0xe1, 0x6a, 0xc3, 0xb3, 0x36, 0xde, 0xdb, 0xdc, 0xb5, 0x6f, - 0xa0, 0x2c, 0x24, 0xf7, 0xb1, 0xda, 0x99, 0x83, 0xd4, 0x7e, 0xbb, 0xd3, 0xb5, 0x13, 0xa8, 0x0c, - 0xb0, 0x79, 0xd0, 0x6d, 0x37, 0xdb, 0x7b, 0x7b, 0xad, 0xae, 0x9d, 0xdc, 0x7a, 0xfc, 0xcb, 0xcb, - 0x55, 0xeb, 0xb7, 0x97, 0xab, 0xd6, 0xef, 0x2f, 0x57, 0xad, 0xef, 0xff, 0x58, 0xbd, 0x01, 0x15, - 0xca, 0xd6, 0xc6, 0x54, 0x12, 0x21, 0xcc, 0xdf, 0xb9, 0xaf, 0xee, 0x84, 0x2b, 0xca, 0xd6, 0xcd, - 0xd7, 0x7a, 0x9f, 0xad, 0x8f, 0xe5, 0xba, 0x46, 0xd7, 0x4d, 0xaa, 0x1e, 0x67, 0xf4, 0xea, 0xff, - 0x7f, 0x06, 0x00, 0x00, 0xff, 0xff, 0x5c, 0x21, 0x3c, 0xb1, 0x4e, 0x0e, 0x00, 0x00, + // 1454 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xdd, 0x6e, 0x1b, 0xb7, + 0x12, 0xce, 0xea, 0x5f, 0xa3, 0xbf, 0x35, 0x2d, 0x3b, 0x1b, 0x9f, 0x1c, 0x1f, 0x41, 0x49, 0x10, + 0xc5, 0xa7, 0xb0, 0x5b, 0xa7, 0x45, 0x83, 0xa2, 0x45, 0x6b, 0xcb, 0x76, 0xaa, 0xc0, 0x8e, 0x5c, + 0x4a, 0xb6, 0x81, 0xa2, 0xc5, 0x62, 0xad, 0xa5, 0x65, 0xc2, 0xd2, 0xae, 0x42, 0x52, 0x52, 0xd5, + 0x97, 0xe8, 0x6d, 0xd1, 0x17, 0xe8, 0x4d, 0xef, 0xfb, 0x0a, 0xbd, 0x6c, 0xde, 0xa0, 0x48, 0xdf, + 0xa1, 0xd7, 0x05, 0xb9, 0x5c, 0x79, 0xa5, 0xb8, 0x8d, 0x93, 0x20, 0x37, 0x82, 0x38, 0xdf, 0x70, + 0x38, 0xfc, 0xbe, 0x19, 0x92, 0x0b, 0xf9, 0x91, 0xe8, 0x3a, 0x82, 0xac, 0x0f, 0x98, 0x2f, 0x7c, + 0x94, 0x0a, 0x46, 0x2b, 0xe6, 0x29, 0xf5, 0x7a, 0x7e, 0xd7, 0x75, 0x84, 0x13, 0x20, 0x2b, 0xb9, + 0x67, 0x43, 0xc2, 0x26, 0x7a, 0x50, 0x14, 0xfe, 0xc0, 0x8f, 0x82, 0x23, 0xc1, 0x06, 0x9d, 0x60, + 0x50, 0x7d, 0x9e, 0x83, 0x74, 0x8b, 0x70, 0x4e, 0x7d, 0x0f, 0xdd, 0x83, 0x22, 0xf5, 0x6c, 0xc1, + 0x1c, 0x8f, 0x3b, 0x1d, 0x41, 0x7d, 0xcf, 0x32, 0x2a, 0x46, 0x2d, 0x83, 0x0b, 0xd4, 0x6b, 0x5f, + 0x1a, 0x51, 0x1d, 0x8a, 0xfc, 0xdc, 0x61, 0xae, 0xcd, 0x83, 0x79, 0xdc, 0x8a, 0x55, 0xe2, 0xb5, + 0xdc, 0xe6, 0xed, 0x75, 0x9d, 0x9d, 0x8e, 0xb7, 0xde, 0x92, 0x5e, 0x7a, 0x80, 0x0b, 0x3c, 0x32, + 0xe2, 0x68, 0x15, 0xc0, 0x19, 0x0a, 0xbf, 0xe3, 0xf7, 0xfb, 0x54, 0x58, 0x09, 0xb5, 0x4e, 0xc4, + 0x82, 0xee, 0x40, 0x41, 0x38, 0xac, 0x4b, 0x84, 0xcd, 0x05, 0xa3, 0x5e, 0xd7, 0x4a, 0x56, 0x8c, + 0x5a, 0x16, 0xe7, 0x03, 0x63, 0x4b, 0xd9, 0xd0, 0x06, 0xa4, 0xfd, 0x81, 0x50, 0x29, 0xa4, 0x2a, + 0x46, 0x2d, 0xb7, 0xb9, 0xb4, 0x1e, 0x6c, 0x7c, 0xf7, 0x3b, 0xd2, 0x19, 0x0a, 0xd2, 0x0c, 0x40, + 0x1c, 0x7a, 0xa1, 0x6d, 0x30, 0x23, 0xdb, 0xb3, 0xfb, 0xbe, 0x4b, 0xac, 0x74, 0xc5, 0xa8, 0x15, + 0x37, 0x6f, 0x86, 0xc9, 0x47, 0x76, 0x7a, 0xe0, 0xbb, 0x04, 0x97, 0xc4, 0xac, 0x01, 0x6d, 0x40, + 0x66, 0xec, 0x30, 0x8f, 0x7a, 0x5d, 0x6e, 0x65, 0xd4, 0xc6, 0x17, 0xf5, 0xaa, 0x5f, 0xc9, 0xdf, + 0x93, 0x00, 0xc3, 0x53, 0x27, 0xf4, 0x39, 0xe4, 0x07, 0x8c, 0x5c, 0xb2, 0x95, 0xbd, 0x06, 0x5b, + 0xb9, 0x01, 0x23, 0x53, 0xae, 0xb6, 0xa0, 0x30, 0xf0, 0xb9, 0xb8, 0x8c, 0x00, 0xd7, 0x88, 0x90, + 0x97, 0x53, 0xa6, 0x21, 0xee, 0x42, 0xb1, 0xe7, 0x70, 0x61, 0x53, 0x8f, 0x13, 0x26, 0x6c, 0xea, + 0x5a, 0xb9, 0x8a, 0x51, 0x4b, 0xe0, 0xbc, 0xb4, 0x36, 0x94, 0xb1, 0xe1, 0xa2, 0xff, 0x02, 0x9c, + 0xf9, 0x43, 0xcf, 0xb5, 0x99, 0x3f, 0xe6, 0x56, 0x5e, 0x79, 0x64, 0x95, 0x05, 0xfb, 0x63, 0x8e, + 0x6c, 0x58, 0x1e, 0x72, 0xc2, 0x6c, 0x97, 0x9c, 0x51, 0x8f, 0xb8, 0xf6, 0xc8, 0x61, 0xd4, 0x39, + 0xed, 0x11, 0x6e, 0x15, 0x54, 0x42, 0x0f, 0xe6, 0x13, 0x3a, 0xe2, 0x84, 0xed, 0x04, 0xce, 0xc7, + 0xa1, 0xef, 0xae, 0x27, 0xd8, 0x04, 0x97, 0x87, 0x57, 0x40, 0xa8, 0x09, 0x26, 0x9f, 0x70, 0x41, + 0xfa, 0x91, 0xd0, 0x45, 0x15, 0xfa, 0xee, 0x4b, 0x7b, 0x55, 0x7e, 0x73, 0x51, 0x4b, 0x7c, 0xd6, + 0x8a, 0xfe, 0x03, 0x59, 0xe6, 0x8f, 0xed, 0x8e, 0x3f, 0xf4, 0x84, 0x55, 0xaa, 0x18, 0xb5, 0x38, + 0xce, 0x30, 0x7f, 0x5c, 0x97, 0x63, 0x59, 0x82, 0xdc, 0x19, 0x91, 0x81, 0x4f, 0x3d, 0xc1, 0x2d, + 0xb3, 0x12, 0xaf, 0x65, 0x71, 0xc4, 0x82, 0x6a, 0x60, 0x52, 0xcf, 0x66, 0x84, 0x13, 0x36, 0x22, + 0xae, 0xdd, 0xf1, 0x3d, 0xcf, 0x5a, 0x50, 0x85, 0x5a, 0xa4, 0x1e, 0xd6, 0xe6, 0xba, 0xef, 0x79, + 0x52, 0xe1, 0x9e, 0xdf, 0xb9, 0x08, 0x05, 0xb2, 0x90, 0x2a, 0xc6, 0x57, 0x28, 0x2c, 0x67, 0x84, + 0x9d, 0xb7, 0x0e, 0x8b, 0x4a, 0x1e, 0x15, 0xe5, 0x9c, 0x38, 0x4c, 0x9c, 0x12, 0x47, 0x58, 0x8b, + 0x2a, 0xe3, 0x05, 0x09, 0xed, 0xfb, 0x9d, 0x8b, 0x2f, 0x43, 0x00, 0x7d, 0x01, 0x26, 0x23, 0x8e, + 0x6b, 0x3b, 0x67, 0x82, 0x30, 0x7b, 0xcc, 0xa8, 0x20, 0x56, 0x59, 0x2d, 0xba, 0x1c, 0x2e, 0x8a, + 0x89, 0xe3, 0x6e, 0x49, 0xf8, 0x44, 0xa2, 0xb8, 0xc8, 0x66, 0xc6, 0xa8, 0x02, 0xb9, 0x9d, 0x9d, + 0xfd, 0x96, 0x60, 0x8e, 0x20, 0xdd, 0x89, 0xb5, 0xa4, 0xba, 0x2b, 0x6a, 0x92, 0x1e, 0x3a, 0xbd, + 0xa3, 0xa3, 0xc6, 0x8e, 0xb5, 0x1c, 0x78, 0x44, 0x4c, 0xe8, 0x43, 0x58, 0x26, 0x9e, 0x24, 0xda, + 0xd6, 0xaa, 0x71, 0x22, 0x84, 0xea, 0x8b, 0x9b, 0x8a, 0xa6, 0x72, 0x80, 0x06, 0x52, 0xb5, 0x34, + 0xb6, 0xf2, 0xab, 0x01, 0xf9, 0x28, 0x13, 0xe8, 0x1e, 0xa4, 0x82, 0xae, 0x56, 0xc7, 0x4d, 0x6e, + 0xb3, 0xa0, 0xdb, 0xa9, 0xad, 0x8c, 0x58, 0x83, 0xf2, 0x74, 0x8a, 0xf6, 0x2e, 0x75, 0xad, 0x98, + 0xa2, 0xa7, 0x10, 0xb1, 0x36, 0x5c, 0xf4, 0x08, 0xf2, 0x42, 0xae, 0x2a, 0x6c, 0xa7, 0x47, 0x1d, + 0x6e, 0xc5, 0xf5, 0xc1, 0x30, 0x3d, 0x04, 0xdb, 0x0a, 0xdd, 0x92, 0x20, 0xce, 0x89, 0xcb, 0x01, + 0xfa, 0x1f, 0xe4, 0xa6, 0x62, 0x53, 0x57, 0x9d, 0x49, 0x71, 0x0c, 0xa1, 0xa9, 0xe1, 0xae, 0x7c, + 0x03, 0xb7, 0xfe, 0xb1, 0xa2, 0x91, 0x09, 0xf1, 0x0b, 0x32, 0x51, 0x5b, 0xc8, 0x62, 0xf9, 0x17, + 0x3d, 0x80, 0xe4, 0xc8, 0xe9, 0x0d, 0x89, 0xca, 0xf3, 0xf2, 0x94, 0xd8, 0xa6, 0xde, 0x74, 0x2e, + 0x0e, 0x3c, 0x3e, 0x89, 0x3d, 0x32, 0x56, 0xb6, 0xa1, 0x7c, 0x55, 0x51, 0x5f, 0x11, 0xb8, 0x1c, + 0x0d, 0x9c, 0x8d, 0xc4, 0x78, 0x92, 0xc8, 0xc4, 0xcd, 0x44, 0xf5, 0x17, 0x03, 0x8a, 0xb3, 0xf2, + 0xa3, 0x0f, 0x60, 0x69, 0xbe, 0x60, 0xec, 0xae, 0xa0, 0xae, 0x0e, 0x8b, 0x66, 0xab, 0xe3, 0xb1, + 0xa0, 0x2e, 0xfa, 0x18, 0xac, 0x97, 0xa6, 0x08, 0xda, 0x27, 0xfe, 0x50, 0xa8, 0x85, 0x0d, 0xbc, + 0x34, 0x3b, 0xab, 0x1d, 0x80, 0xb2, 0x98, 0x75, 0x23, 0xc8, 0xbb, 0xa4, 0x73, 0xa1, 0x16, 0x0a, + 0x84, 0xc8, 0xe0, 0x05, 0x0d, 0xb5, 0x25, 0x22, 0xd7, 0xe1, 0xd5, 0x9f, 0x63, 0x50, 0xd4, 0x07, + 0x36, 0x26, 0xcf, 0x86, 0x84, 0x0b, 0xf4, 0x1e, 0x64, 0x3b, 0x4e, 0xaf, 0x47, 0x98, 0xad, 0x53, + 0xcc, 0x6d, 0x96, 0xd6, 0x83, 0x6b, 0xab, 0xae, 0xec, 0x8d, 0x1d, 0x9c, 0x09, 0x3c, 0x1a, 0x2e, + 0x7a, 0x00, 0xe9, 0xb0, 0xf3, 0x62, 0x53, 0xdf, 0x68, 0xe7, 0xe1, 0x10, 0x47, 0xf7, 0x21, 0xa9, + 0x54, 0xd0, 0x65, 0xb1, 0x10, 0x6a, 0x22, 0xcf, 0x38, 0x75, 0x7c, 0xe3, 0x00, 0x47, 0x1f, 0x81, + 0xae, 0x0d, 0x5b, 0x4c, 0x06, 0x44, 0x15, 0x43, 0x71, 0xb3, 0x3c, 0x5f, 0x45, 0xed, 0xc9, 0x80, + 0x60, 0x10, 0xd3, 0xff, 0xb2, 0x48, 0x2f, 0xc8, 0x84, 0x0f, 0x9c, 0x0e, 0xb1, 0xd5, 0x85, 0xa7, + 0x2e, 0xa6, 0x2c, 0x2e, 0x84, 0x56, 0x55, 0xf9, 0xd1, 0x8b, 0x2b, 0x7d, 0x9d, 0x8b, 0xeb, 0x49, + 0x22, 0x93, 0x34, 0x53, 0xd5, 0x1f, 0x0c, 0x28, 0x4d, 0x99, 0xe2, 0x03, 0xdf, 0xe3, 0x72, 0xc5, + 0x24, 0x61, 0xcc, 0x67, 0x73, 0x34, 0xe1, 0xc3, 0xfa, 0xae, 0x34, 0xe3, 0x00, 0x7d, 0x1d, 0x8e, + 0xd6, 0x20, 0xc5, 0x08, 0x1f, 0xf6, 0x84, 0x26, 0x09, 0x45, 0xaf, 0x37, 0xac, 0x10, 0xac, 0x3d, + 0xaa, 0xcf, 0x63, 0xb0, 0xa8, 0x33, 0xda, 0x76, 0x44, 0xe7, 0xfc, 0x9d, 0x0b, 0xf8, 0x7f, 0x48, + 0xcb, 0x6c, 0x28, 0x91, 0x05, 0x15, 0xbf, 0x5a, 0xc2, 0xd0, 0xe3, 0x2d, 0x44, 0x74, 0xf8, 0xcc, + 0x3b, 0x28, 0x19, 0xbc, 0x83, 0x1c, 0x1e, 0x7d, 0x07, 0xbd, 0x23, 0xad, 0xab, 0x3f, 0x19, 0x50, + 0x9e, 0xe5, 0xf4, 0x9d, 0x49, 0xfd, 0x3e, 0xa4, 0x03, 0x21, 0x43, 0x36, 0x97, 0x75, 0x6e, 0x81, + 0xcc, 0x27, 0x54, 0x9c, 0x07, 0xa1, 0x43, 0x37, 0xd9, 0xac, 0xe5, 0x96, 0x60, 0xc4, 0xe9, 0xbf, + 0x55, 0xcb, 0x4e, 0xfb, 0x30, 0xf6, 0x7a, 0x7d, 0x18, 0x7f, 0xe3, 0x3e, 0x4c, 0xbc, 0x42, 0x9b, + 0xe4, 0xb5, 0x1e, 0x90, 0x11, 0x6e, 0x53, 0xff, 0xce, 0x6d, 0xb5, 0x0e, 0x4b, 0x73, 0x44, 0x69, + 0x19, 0x2f, 0xfb, 0xcb, 0x78, 0x65, 0x7f, 0x7d, 0x0b, 0xb7, 0x30, 0xe1, 0x7e, 0x6f, 0x44, 0x22, + 0x95, 0xf7, 0x66, 0x94, 0x23, 0x48, 0xb8, 0x42, 0xdf, 0x9a, 0x59, 0xac, 0xfe, 0x57, 0x6f, 0xc3, + 0xca, 0x55, 0xe1, 0x83, 0x44, 0xab, 0x0f, 0x21, 0x7f, 0x1c, 0x6c, 0x61, 0xaf, 0xe7, 0x74, 0xb9, + 0x7c, 0x93, 0xf7, 0xa9, 0x47, 0xfb, 0xf4, 0x7b, 0x62, 0xf3, 0x0b, 0x32, 0xd6, 0x9f, 0x07, 0xf9, + 0xd0, 0xd8, 0xba, 0x20, 0xe3, 0xea, 0x5f, 0x06, 0x14, 0xf5, 0xac, 0x37, 0xcb, 0x73, 0x4e, 0xf1, + 0xd8, 0x35, 0x15, 0xbf, 0x0f, 0xc9, 0x91, 0xba, 0xd1, 0xc2, 0x93, 0x3d, 0xf2, 0x51, 0x74, 0x2c, + 0x2f, 0x1a, 0x1c, 0xe0, 0x92, 0xfe, 0x33, 0xda, 0x13, 0x84, 0xa9, 0x92, 0x90, 0xf4, 0x47, 0x3c, + 0xf7, 0x14, 0x82, 0xb5, 0x07, 0x5a, 0x83, 0xe4, 0x99, 0xdc, 0xba, 0xae, 0x8e, 0x72, 0x28, 0x76, + 0x94, 0x16, 0x1c, 0xb8, 0x54, 0x3f, 0x83, 0xd2, 0x74, 0xdf, 0x97, 0x4a, 0x93, 0x11, 0x91, 0xaf, + 0x4b, 0x43, 0x75, 0xd7, 0xcc, 0x52, 0xc7, 0xbb, 0x12, 0xc2, 0xda, 0x63, 0x6d, 0x07, 0x4a, 0x73, + 0x9f, 0x1e, 0xa8, 0x04, 0xb9, 0xa3, 0xa7, 0xad, 0xc3, 0xdd, 0x7a, 0x63, 0xaf, 0xb1, 0xbb, 0x63, + 0xde, 0x40, 0x00, 0xa9, 0x56, 0xe3, 0xe9, 0xe3, 0xfd, 0x5d, 0xd3, 0x40, 0x59, 0x48, 0x1e, 0x1c, + 0xed, 0xb7, 0x1b, 0x66, 0x4c, 0xfe, 0x6d, 0x9f, 0x34, 0x0f, 0xeb, 0x66, 0x7c, 0xed, 0x53, 0xc8, + 0xd5, 0xd5, 0x07, 0x54, 0x93, 0xb9, 0x84, 0xc9, 0x09, 0x4f, 0x9b, 0xf8, 0x60, 0x6b, 0xdf, 0xbc, + 0x81, 0xd2, 0x10, 0x3f, 0xc4, 0x72, 0x66, 0x06, 0x12, 0x87, 0xcd, 0x56, 0xdb, 0x8c, 0xa1, 0x22, + 0xc0, 0xd6, 0x51, 0xbb, 0x59, 0x6f, 0x1e, 0x1c, 0x34, 0xda, 0x66, 0x7c, 0x7b, 0xef, 0xb7, 0x17, + 0xab, 0xc6, 0xef, 0x2f, 0x56, 0x8d, 0x3f, 0x5e, 0xac, 0x1a, 0x3f, 0xfe, 0xb9, 0x7a, 0x03, 0x4a, + 0xd4, 0x5f, 0x1f, 0x51, 0x41, 0x38, 0x0f, 0xbe, 0x17, 0xbf, 0xbe, 0xa3, 0x47, 0xd4, 0xdf, 0x08, + 0xfe, 0x6d, 0x74, 0xfd, 0x8d, 0x91, 0xd8, 0x50, 0xe8, 0x46, 0x40, 0xcf, 0x69, 0x4a, 0x8d, 0x1e, + 0xfe, 0x1d, 0x00, 0x00, 0xff, 0xff, 0xb1, 0xa5, 0xb0, 0xf3, 0xaf, 0x0e, 0x00, 0x00, } func (m *Session) Marshal() (dAtA []byte, err error) { @@ -2207,6 +2266,43 @@ func (m *ResolveTransactionResponse) MarshalToSizedBuffer(dAtA []byte) (int, err return len(dAtA) - i, nil } +func (m *VStreamFlags) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *VStreamFlags) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *VStreamFlags) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.MinimizeSkew { + i-- + if m.MinimizeSkew { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *VStreamRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2231,6 +2327,18 @@ func (m *VStreamRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.Flags != nil { + { + size, err := m.Flags.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintVtgate(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } if m.Filter != nil { { size, err := m.Filter.MarshalToSizedBuffer(dAtA[:i]) @@ -2697,6 +2805,21 @@ func (m *ResolveTransactionResponse) Size() (n int) { return n } +func (m *VStreamFlags) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.MinimizeSkew { + n += 2 + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func (m *VStreamRequest) Size() (n int) { if m == nil { return 0 @@ -2718,6 +2841,10 @@ func (m *VStreamRequest) Size() (n int) { l = m.Filter.Size() n += 1 + l + sovVtgate(uint64(l)) } + if m.Flags != nil { + l = m.Flags.Size() + n += 1 + l + sovVtgate(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -5239,6 +5366,80 @@ func (m *ResolveTransactionResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *VStreamFlags) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowVtgate + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: VStreamFlags: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: VStreamFlags: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MinimizeSkew", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowVtgate + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.MinimizeSkew = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipVtgate(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthVtgate + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthVtgate + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *VStreamRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -5395,6 +5596,42 @@ func (m *VStreamRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Flags", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowVtgate + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthVtgate + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthVtgate + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Flags == nil { + m.Flags = &VStreamFlags{} + } + if err := m.Flags.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipVtgate(dAtA[iNdEx:]) diff --git a/go/vt/vitessdriver/fakeserver_test.go b/go/vt/vitessdriver/fakeserver_test.go index 2a8a1f5b00a..5cccbb237aa 100644 --- a/go/vt/vitessdriver/fakeserver_test.go +++ b/go/vt/vitessdriver/fakeserver_test.go @@ -140,7 +140,7 @@ func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string) return nil } -func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { return nil } diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index e11088045eb..f5eec59be3e 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -23,6 +23,8 @@ import ( "sync" "testing" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" @@ -79,7 +81,8 @@ func TestVStream(t *testing.T) { Match: "/.*/", }}, } - reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags) if err != nil { t.Fatal(err) } @@ -194,7 +197,8 @@ func TestVStreamCopyBasic(t *testing.T) { Filter: "select * from t1", }}, } - reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags) _, _ = conn, mconn if err != nil { t.Fatal(err) @@ -247,7 +251,8 @@ func TestVStreamCurrent(t *testing.T) { Filter: "select * from t1", }}, } - reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags) _, _ = conn, mconn if err != nil { t.Fatal(err) diff --git a/go/vt/vtgate/fakerpcvtgateconn/conn.go b/go/vt/vtgate/fakerpcvtgateconn/conn.go index 9142fa64386..ad655cb645d 100644 --- a/go/vt/vtgate/fakerpcvtgateconn/conn.go +++ b/go/vt/vtgate/fakerpcvtgateconn/conn.go @@ -164,7 +164,9 @@ func (conn *FakeVTGateConn) ResolveTransaction(ctx context.Context, dtid string) } // VStream streams binlog events. -func (conn *FakeVTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (vtgateconn.VStreamReader, error) { +func (conn *FakeVTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, + filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (vtgateconn.VStreamReader, error) { + return nil, fmt.Errorf("NYI") } diff --git a/go/vt/vtgate/grpcvtgateconn/conn.go b/go/vt/vtgate/grpcvtgateconn/conn.go index 805558131a4..a82e81c3a3a 100644 --- a/go/vt/vtgate/grpcvtgateconn/conn.go +++ b/go/vt/vtgate/grpcvtgateconn/conn.go @@ -184,12 +184,15 @@ func (a *vstreamAdapter) Recv() ([]*binlogdatapb.VEvent, error) { return r.Events, nil } -func (conn *vtgateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (vtgateconn.VStreamReader, error) { +func (conn *vtgateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, + filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (vtgateconn.VStreamReader, error) { + req := &vtgatepb.VStreamRequest{ CallerId: callerid.EffectiveCallerIDFromContext(ctx), TabletType: tabletType, Vgtid: vgtid, Filter: filter, + Flags: flags, } stream, err := conn.c.VStream(ctx, req) if err != nil { diff --git a/go/vt/vtgate/grpcvtgateconn/suite_test.go b/go/vt/vtgate/grpcvtgateconn/suite_test.go index f767657964c..5d126fd7802 100644 --- a/go/vt/vtgate/grpcvtgateconn/suite_test.go +++ b/go/vt/vtgate/grpcvtgateconn/suite_test.go @@ -212,7 +212,7 @@ func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string) return nil } -func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { panic("unimplemented") } diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index ae108c06ee3..43bbe917bec 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -198,6 +198,7 @@ func (vtg *VTGate) VStream(request *vtgatepb.VStreamRequest, stream vtgateservic request.TabletType, request.Vgtid, request.Filter, + request.Flags, func(events []*binlogdatapb.VEvent) error { return stream.Send(&vtgatepb.VStreamResponse{ Events: events, diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 892ddf5a037..e75c1f7ba50 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -17,11 +17,13 @@ limitations under the License. package vtgate import ( + "context" "fmt" "io" "sync" + "time" - "context" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" "github.com/golang/protobuf/proto" @@ -65,6 +67,26 @@ type vstream struct { cancel context.CancelFunc wg sync.WaitGroup + + // this flag is set by the client, default false + // if true skew detection is enabled and we align the streams so that they receive events from + // about the same time as each other. Note that there is no exact ordering of events across shards + minimizeSkew bool + + // mutex used to synchronize access to skew detection parameters + skewMu sync.Mutex + // channel is created whenever there is a skew detected. closing it implies the current skew has been fixed + skewCh chan bool + // if a skew lasts for this long, we timeout the vstream call. currently hardcoded + skewTimeoutSeconds int64 + // the slow streamId which is causing the skew. streamId is of the form . + laggard string + // transaction timestamp of the slowest stream + lowestTS int64 + // the timestamp of the most recent event, keyed by streamId. streamId is of the form . + timestamps map[string]int64 + + vsm *vstreamManager } type journalEvent struct { @@ -81,8 +103,9 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str } } -func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error { - vgtid, filter, err := vsm.resolveParams(ctx, tabletType, vgtid, filter) +func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, + filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func(events []*binlogdatapb.VEvent) error) error { + vgtid, filter, flags, err := vsm.resolveParams(ctx, tabletType, vgtid, filter, flags) if err != nil { return err } @@ -93,12 +116,19 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta send: send, resolver: vsm.resolver, journaler: make(map[int64]*journalEvent), + + minimizeSkew: flags.MinimizeSkew, + skewTimeoutSeconds: 10 * 60, + timestamps: make(map[string]int64), + vsm: vsm, } return vs.stream(ctx) } // resolveParams provides defaults for the inputs if they're not specified. -func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (*binlogdatapb.VGtid, *binlogdatapb.Filter, error) { +func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, + filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (*binlogdatapb.VGtid, *binlogdatapb.Filter, *vtgatepb.VStreamFlags, error) { + if filter == nil { filter = &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -106,8 +136,12 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat }}, } } + + if flags == nil { + flags = &vtgatepb.VStreamFlags{} + } if vgtid == nil || len(vgtid.ShardGtids) == 0 { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") } // To fetch from all keyspaces, the input must contain a single ShardGtid // that has an empty keyspace, and the Gtid must be "current". In the @@ -115,11 +149,11 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat // copying of existing data. if len(vgtid.ShardGtids) == 1 && vgtid.ShardGtids[0].Keyspace == "" { if vgtid.ShardGtids[0].Gtid != "current" { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) } keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) if err != nil { - return nil, nil, err + return nil, nil, nil, err } newvgtid := &binlogdatapb.VGtid{} for _, keyspace := range keyspaces { @@ -134,12 +168,12 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat for _, sgtid := range vgtid.ShardGtids { if sgtid.Shard == "" { if sgtid.Gtid != "current" { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid) + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid) } // TODO(sougou): this should work with the new Migrate workflow _, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for _, shard := range allShards { newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ @@ -152,11 +186,19 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat newvgtid.ShardGtids = append(newvgtid.ShardGtids, sgtid) } } + //TODO add tablepk validations - return newvgtid, filter, nil + return newvgtid, filter, flags, nil +} + +func (vsm *vstreamManager) RecordStreamDelay() { + vstreamSkewDelayCount.Add(1) } +func (vsm *vstreamManager) GetTotalStreamDelay() int64 { + return vstreamSkewDelayCount.Get() +} func (vs *vstream) stream(ctx context.Context) error { ctx, vs.cancel = context.WithCancel(ctx) defer vs.cancel() @@ -187,6 +229,95 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard }() } +// MaxSkew is the threshold for a skew to be detected. Since MySQL timestamps are in seconds we account for +// two round-offs: one for the actual event and another while accounting for the clock skew +const MaxSkew = int64(2) + +// computeSkew sets the timestamp of the current event for the calling stream, accounts for a clock skew +// and declares that a skew has arisen if the streams are too far apart +func (vs *vstream) computeSkew(streamID string, event *binlogdatapb.VEvent) bool { + vs.skewMu.Lock() + defer vs.skewMu.Unlock() + // account for skew between this vtgate and the source mysql server + secondsInThePast := event.CurrentTime/1e9 - event.Timestamp + vs.timestamps[streamID] = time.Now().Unix() - secondsInThePast + + var minTs, maxTs int64 + var laggardStream string + + if len(vs.timestamps) <= 1 { + return false + } + for k, ts := range vs.timestamps { + if ts < minTs || minTs == 0 { + minTs = ts + laggardStream = k + } + if ts > maxTs { + maxTs = ts + } + } + if vs.laggard != "" { // we are skewed, check if this event has fixed the skew + if (maxTs - minTs) <= MaxSkew { + vs.laggard = "" + close(vs.skewCh) + } + } else { + if (maxTs - minTs) > MaxSkew { // check if we are skewed due to this event + log.Infof("Skew found, laggard is %s, %+v", laggardStream, vs.timestamps) + vs.laggard = laggardStream + vs.skewCh = make(chan bool) + } + } + return vs.mustPause(streamID) +} + +// mustPause returns true if a skew exists and the stream calling this is not the slowest one +func (vs *vstream) mustPause(streamID string) bool { + switch vs.laggard { + case "": + return false + case streamID: + // current stream is the laggard, not pausing + return false + } + + if (vs.timestamps[streamID] - vs.lowestTS) <= MaxSkew { + // current stream is not the laggard, but the skew is still within the limit + return false + } + vs.vsm.RecordStreamDelay() + return true +} + +// alignStreams is called by each individual shard's stream before an event is sent to the client or after each heartbeat. +// It checks for skew (if the minimizeSkew option is set). If skew is present this stream is delayed until the skew is fixed +// The faster stream detects the skew and waits. The slower stream resets the skew when it catches up. +func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, keyspace, shard string) error { + if !vs.minimizeSkew || event.Timestamp == 0 { + return nil + } + streamID := fmt.Sprintf("%s/%s", keyspace, shard) + for { + mustPause := vs.computeSkew(streamID, event) + if event.Type == binlogdatapb.VEventType_HEARTBEAT { + return nil + } + if !mustPause { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(vs.skewTimeoutSeconds) * time.Second): + log.Errorf("timed out while waiting for skew to reduce: %s", streamID) + return fmt.Errorf("timed out while waiting for skew to reduce: %s", streamID) + case <-vs.skewCh: + // once skew is fixed the channel is closed and all waiting streams "wake up" + } + } +} + // streamFromTablet streams from one shard. If transactions come in separate chunks, they are grouped and sent. func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.ShardGtid) error { // journalDone is assigned a channel when a journal event is encountered. @@ -249,6 +380,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER: sendevents = append(sendevents, event) eventss = append(eventss, sendevents) + + if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { + return err + } + if err := vs.sendAll(sgtid, eventss); err != nil { return err } @@ -258,6 +394,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Remove all heartbeat events for now. // Otherwise they can accumulate indefinitely if there are no real events. // TODO(sougou): figure out a model for this. + if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { + return err + } + case binlogdatapb.VEventType_JOURNAL: journal := event.Journal // Journal events are not sent to clients. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 0515b99732e..addbe49269b 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -19,7 +19,14 @@ package vtgate import ( "fmt" "strings" + "sync" "testing" + "time" + + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/vttablet/sandboxconn" "context" @@ -36,6 +43,102 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +var mu sync.Mutex + +func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent { + mu.Lock() + defer mu.Unlock() + var vevents []*binlogdatapb.VEvent + var i int64 + currentTime := time.Now().Unix() + for i = count; i > 0; i-- { + j := i + idx + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_GTID, Gtid: fmt.Sprintf("gtid-%s-%d", shard, j), + Timestamp: currentTime - j, + CurrentTime: currentTime * 1e9, + }) + + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COMMIT, + Timestamp: currentTime - j, + CurrentTime: currentTime * 1e9, + }) + } + return vevents +} + +func TestVStreamSkew(t *testing.T) { + stream := func(conn *sandboxconn.SandboxConn, shard string, count, idx int64) { + vevents := getVEvents(shard, count, idx) + for _, ev := range vevents { + conn.VStreamCh <- ev + time.Sleep(time.Duration(idx*100) * time.Millisecond) + } + } + type skewTestCase struct { + numEventsPerShard int64 + shard0idx, shard1idx int64 + expectedDelays int64 + } + tcases := []*skewTestCase{ + // shard0 events are all attempted to be sent first along with the first event of shard1 due to the increased sleep + // for shard1 in stream(). Third event and fourth events of shard0 need to wait for shard1 to catch up + {numEventsPerShard: 4, shard0idx: 1, shard1idx: 2, expectedDelays: 2}, + + // no delays if streams are aligned or if only one stream is present + {numEventsPerShard: 4, shard0idx: 1, shard1idx: 1, expectedDelays: 0}, + {numEventsPerShard: 4, shard0idx: 0, shard1idx: 1, expectedDelays: 0}, + {numEventsPerShard: 4, shard0idx: 1, shard1idx: 0, expectedDelays: 0}, + } + previousDelays := int64(0) + vstreamSkewDelayCount = stats.NewCounter("VStreamEventsDelayedBySkewAlignment", + "Number of events that had to wait because the skew across shards was too high") + for idx, tcase := range tcases { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + name := fmt.Sprintf("TestVStreamSkew-%d", idx) + _ = createSandbox(name) + hc := discovery.NewFakeHealthCheck() + vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") + shard0 := "-20" + shard1 := "20-40" + vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} + want := int64(0) + var sbc0, sbc1 *sandboxconn.SandboxConn + if tcase.shard0idx != 0 { + sbc0 = hc.AddTestTablet("aa", "1.1.1.1", 1001, name, shard0, topodatapb.TabletType_MASTER, true, 1, nil) + sbc0.VStreamCh = make(chan *binlogdatapb.VEvent) + want += 2 * tcase.numEventsPerShard + vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: name, Gtid: "pos", Shard: "-20"}) + go stream(sbc0, shard0, tcase.numEventsPerShard, tcase.shard0idx) + } + if tcase.shard1idx != 0 { + sbc1 = hc.AddTestTablet("aa", "1.1.1.1", 1002, name, shard1, topodatapb.TabletType_MASTER, true, 1, nil) + sbc1.VStreamCh = make(chan *binlogdatapb.VEvent) + want += 2 * tcase.numEventsPerShard + vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: name, Gtid: "pos", Shard: "20-40"}) + go stream(sbc1, shard1, tcase.numEventsPerShard, tcase.shard1idx) + } + ch := startVStream(ctx, t, vsm, vgtid, true) + var receivedEvents []*binlogdatapb.VEvent + for len(receivedEvents) < int(want) { + select { + case <-time.After(1 * time.Minute): + require.FailNow(t, "test timed out") + case response := <-ch: + receivedEvents = append(receivedEvents, response.Events...) + } + } + require.Equal(t, int(want), int(len(receivedEvents))) + require.Equal(t, tcase.expectedDelays, vsm.GetTotalStreamDelay()-previousDelays) + previousDelays = vsm.GetTotalStreamDelay() + }) + } +} + func TestVStreamEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -91,7 +194,7 @@ func TestVStreamEvents(t *testing.T) { } ch := make(chan *binlogdatapb.VStreamResponse) go func() { - err := vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, func(events []*binlogdatapb.VEvent) error { + err := vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { ch <- &binlogdatapb.VStreamResponse{Events: events} return nil }) @@ -142,7 +245,7 @@ func TestVStreamChunks(t *testing.T) { Gtid: "pos", }}, } - _ = vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, func(events []*binlogdatapb.VEvent) error { + _ = vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { switch events[0].Type { case binlogdatapb.VEventType_ROW: if doneCounting { @@ -214,7 +317,7 @@ func TestVStreamMulti(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid) + ch := startVStream(ctx, t, vsm, vgtid, false) <-ch response := <-ch var got *binlogdatapb.VGtid @@ -267,7 +370,7 @@ func TestVStreamRetry(t *testing.T) { Gtid: "pos", }}, } - err := vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, func(events []*binlogdatapb.VEvent) error { + err := vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { count++ return nil }) @@ -322,7 +425,7 @@ func TestVStreamHeartbeat(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid) + ch := startVStream(ctx, t, vsm, vgtid, false) verifyEvents(t, ch, want) } @@ -404,7 +507,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid) + ch := startVStream(ctx, t, vsm, vgtid, false) verifyEvents(t, ch, want1) // The following two events from the different shards can come in any order. @@ -515,7 +618,7 @@ func TestVStreamJournalManyToOne(t *testing.T) { Gtid: "pos1020", }}, } - ch := startVStream(ctx, t, vsm, vgtid) + ch := startVStream(ctx, t, vsm, vgtid, false) // The following two events from the different shards can come in any order. // But the resulting VGTID should be the same after both are received. <-ch @@ -661,7 +764,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid) + ch := startVStream(ctx, t, vsm, vgtid, false) verifyEvents(t, ch, want1, wantjn1, want2, wantjn2, want3) } @@ -708,7 +811,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { Gtid: "pos1020", }}, } - err := vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, func(events []*binlogdatapb.VEvent) error { + err := vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { t.Errorf("unexpected events: %v", events) return nil }) @@ -737,7 +840,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { }}, } sbc2.AddVStreamEvents(send, nil) - err = vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, func(events []*binlogdatapb.VEvent) error { + err = vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { t.Errorf("unexpected events: %v", events) return nil }) @@ -850,7 +953,7 @@ func TestResolveVStreamParams(t *testing.T) { }}, } for _, tcase := range testcases { - vgtid, filter, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, tcase.input, nil) + vgtid, filter, flags, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, tcase.input, nil, nil) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("resolve(%v) err: %v, must contain %v", tcase.input, err, tcase.err) @@ -860,6 +963,7 @@ func TestResolveVStreamParams(t *testing.T) { require.NoError(t, err, tcase.input) assert.Equal(t, tcase.output, vgtid, tcase.input) assert.Equal(t, wantFilter, filter, tcase.input) + require.False(t, flags.MinimizeSkew) } // Special-case: empty keyspace because output is too big. input := &binlogdatapb.VGtid{ @@ -867,11 +971,27 @@ func TestResolveVStreamParams(t *testing.T) { Gtid: "current", }}, } - vgtid, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil) + vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) require.NoError(t, err, input) if got, want := len(vgtid.ShardGtids), 8; want >= got { t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, want) } + for _, minimizeSkew := range []bool{true, false} { + t.Run(fmt.Sprintf("resolveParams MinimizeSkew %t", minimizeSkew), func(t *testing.T) { + flags := &vtgatepb.VStreamFlags{MinimizeSkew: minimizeSkew} + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "TestVStream", + Shard: "-20", + Gtid: "current", + }}, + } + _, _, flags2, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, vgtid, nil, flags) + require.NoError(t, err) + require.Equal(t, minimizeSkew, flags2.MinimizeSkew) + }) + } + } func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { @@ -880,10 +1000,10 @@ func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell s return newVStreamManager(srvResolver, serv, cell) } -func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid) <-chan *binlogdatapb.VStreamResponse { +func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, minimizeSkew bool) <-chan *binlogdatapb.VStreamResponse { ch := make(chan *binlogdatapb.VStreamResponse) go func() { - _ = vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, func(events []*binlogdatapb.VEvent) error { + _ = vsm.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, nil, &vtgatepb.VStreamFlags{MinimizeSkew: true}, func(events []*binlogdatapb.VEvent) error { ch <- &binlogdatapb.VStreamResponse{Events: events} return nil }) @@ -895,6 +1015,9 @@ func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants . t.Helper() for i, want := range wants { got := <-ch + for _, event := range got.Events { + event.Timestamp = 0 + } if !proto.Equal(got, want) { t.Errorf("vstream(%d):\n%v, want\n%v", i, got, want) } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 221805ade8d..0f1b86b643f 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -111,6 +111,8 @@ var ( errorCounts *stats.CountersWithMultiLabels warnings *stats.CountersWithSingleLabel + + vstreamSkewDelayCount *stats.Counter ) // VTGate is the rpc interface to vtgate. Only one instance @@ -152,6 +154,9 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa // catch the initial load stats. vschemaCounters = stats.NewCountersWithSingleLabel("VtgateVSchemaCounts", "Vtgate vschema counts", "changes") + vstreamSkewDelayCount = stats.NewCounter("VStreamEventsDelayedBySkewAlignment", + "Number of events that had to wait because the skew across shards was too high") + // Build objects from low to high level. // Start with the gateway. If we can't reach the topology service, // we can't go on much further, so we log.Fatal out. @@ -404,8 +409,8 @@ handleError: } // VStream streams binlog events. -func (vtg *VTGate) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return vtg.vsm.VStream(ctx, tabletType, vgtid, filter, send) +func (vtg *VTGate) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { + return vtg.vsm.VStream(ctx, tabletType, vgtid, filter, flags, send) } // GetGatewayCacheStatus returns a displayable version of the Gateway cache. diff --git a/go/vt/vtgate/vtgateconn/vtgateconn.go b/go/vt/vtgate/vtgateconn/vtgateconn.go index 5df12f69309..17fe91fef4d 100644 --- a/go/vt/vtgate/vtgateconn/vtgateconn.go +++ b/go/vt/vtgate/vtgateconn/vtgateconn.go @@ -74,8 +74,9 @@ type VStreamReader interface { } // VStream streams binlog events. -func (conn *VTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (VStreamReader, error) { - return conn.impl.VStream(ctx, tabletType, vgtid, filter) +func (conn *VTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, + filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (VStreamReader, error) { + return conn.impl.VStream(ctx, tabletType, vgtid, filter, flags) } // VTGateSession exposes the V3 API to the clients. @@ -134,7 +135,7 @@ type Impl interface { ResolveTransaction(ctx context.Context, dtid string) error // VStream streams binlogevents - VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (VStreamReader, error) + VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (VStreamReader, error) // Close must be called for releasing resources. Close() diff --git a/go/vt/vtgate/vtgateservice/interface.go b/go/vt/vtgate/vtgateservice/interface.go index 94f28205776..9daf7169a25 100644 --- a/go/vt/vtgate/vtgateservice/interface.go +++ b/go/vt/vtgate/vtgateservice/interface.go @@ -41,7 +41,7 @@ type VTGateService interface { ResolveTransaction(ctx context.Context, dtid string) error // Update Stream methods - VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error // HandlePanic should be called with defer at the beginning of each // RPC implementation method, before calling any of the previous methods diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 6bbb5657b49..63d8819d516 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -21,7 +21,9 @@ package sandboxconn import ( "fmt" "sync" + "time" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "context" @@ -96,6 +98,7 @@ type SandboxConn struct { StartPos string VStreamEvents [][]*binlogdatapb.VEvent VStreamErrors []error + VStreamCh chan *binlogdatapb.VEvent // transaction id generator TransactionID sync2.AtomicInt64 @@ -406,18 +409,60 @@ func (sbc *SandboxConn) AddVStreamEvents(events []*binlogdatapb.VEvent, err erro // VStream is part of the QueryService interface. func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { if sbc.StartPos != "" && sbc.StartPos != startPos { + log.Errorf("startPos(%v): %v, want %v", target, startPos, sbc.StartPos) return fmt.Errorf("startPos(%v): %v, want %v", target, startPos, sbc.StartPos) } - for len(sbc.VStreamEvents) != 0 { - ev := sbc.VStreamEvents[0] - err := sbc.VStreamErrors[0] - sbc.VStreamEvents = sbc.VStreamEvents[1:] - sbc.VStreamErrors = sbc.VStreamErrors[1:] - if ev == nil { - return err + done := false + // for testing the minimize stream skew feature (TestStreamSkew) we need the ability to send events in specific sequences from + // multiple streams. We introduce a channel in the sandbox that we listen on and vstream those events + // as we receive them. We also need to simulate vstreamer heartbeats since the skew detection logic depends on it + // in case of shards where there are no real events within a second + if sbc.VStreamCh != nil { + lastTimestamp := int64(0) + for !done { + timer := time.NewTimer(1 * time.Second) + select { + case <-timer.C: + events := []*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_HEARTBEAT, + Timestamp: lastTimestamp, + CurrentTime: lastTimestamp, + }, { + Type: binlogdatapb.VEventType_COMMIT, + Timestamp: lastTimestamp, + CurrentTime: lastTimestamp, + }} + + if err := send(events); err != nil { + log.Infof("error sending event in test sandbox %s", err.Error()) + return err + } + lastTimestamp++ + + case ev := <-sbc.VStreamCh: + if ev == nil { + done = true + } + if err := send([]*binlogdatapb.VEvent{ev}); err != nil { + log.Infof("error sending event in test sandbox %s", err.Error()) + return err + } + lastTimestamp = ev.Timestamp + } } - if err := send(ev); err != nil { - return err + } else { + // this path is followed for all vstream tests other than the skew tests + for len(sbc.VStreamEvents) != 0 { + ev := sbc.VStreamEvents[0] + err := sbc.VStreamErrors[0] + sbc.VStreamEvents = sbc.VStreamEvents[1:] + sbc.VStreamErrors = sbc.VStreamErrors[1:] + if ev == nil { + return err + } + if err := send(ev); err != nil { + return err + } } } // Don't return till context is canceled. diff --git a/proto/vtgate.proto b/proto/vtgate.proto index 61ec3d7211a..e2714899d79 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -21,7 +21,7 @@ option go_package = "vitess.io/vitess/go/vt/proto/vtgate"; package vtgate; -option java_package="io.vitess.proto"; +option java_package = "io.vitess.proto"; import "binlogdata.proto"; import "query.proto"; @@ -114,7 +114,7 @@ message Session { // user_defined_variables contains all the @variables defined for this session map user_defined_variables = 13; - + // system_variables keeps track of all session variables set for this connection // TODO: systay should we keep this so we can apply it ordered? map system_variables = 14; @@ -270,6 +270,10 @@ message ResolveTransactionRequest { message ResolveTransactionResponse { } +message VStreamFlags { + bool minimize_skew = 1; +} + // VStreamRequest is the payload for VStream. message VStreamRequest { vtrpc.CallerID caller_id = 1; @@ -281,6 +285,7 @@ message VStreamRequest { // position is of the form 'ks1:0@MySQL56/|ks2:-80@MySQL56/'. binlogdata.VGtid vgtid = 3; binlogdata.Filter filter = 4; + VStreamFlags flags = 5; } // VStreamResponse is streamed by VStream.