From 41767d84fbd699d8a39dfa8223cb6fec4a306ac6 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 12 Feb 2019 22:35:58 -0800 Subject: [PATCH 1/2] vreplication: improved lag tracking The new lag tracking introduces the following changes: * VStreamer sends its current time along with every event. This allows for VPlayer to correct for any clock skew that may exist between the two machines. This results in a more accurate calculation of lag. * If there are no events to send for a period of time, VStreamer sends a heartbeat event. This allows us VPlayer to essentially know for sure that it's still caught up. * If VPlayer receives no event for its wait period, then it updates the SecondsBehindMaster stat to indicate that it's actually falling behind. The VStreamer timeout for heartbeat is set slightly lower than the VPlayer idle timeout. This ensures that Vplayer won't timeout exactly when it's about to receive the heartbeat event. Signed-off-by: Sugu Sougoumarane --- go/vt/proto/binlogdata/binlogdata.pb.go | 282 +++++++++--------- .../tabletmanager/vreplication/vplayer.go | 26 +- .../tabletserver/vstreamer/vstreamer.go | 33 +- .../tabletserver/vstreamer/vstreamer_test.go | 2 + proto/binlogdata.proto | 3 + py/vtproto/binlogdata_pb2.py | 32 +- 6 files changed, 226 insertions(+), 152 deletions(-) diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 2683321fd16..87f39855ed3 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -48,7 +48,7 @@ func (x OnDDLAction) String() string { return proto.EnumName(OnDDLAction_name, int32(x)) } func (OnDDLAction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{0} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{0} } // VEventType enumerates the event types. @@ -57,20 +57,21 @@ func (OnDDLAction) EnumDescriptor() ([]byte, []int) { type VEventType int32 const ( - VEventType_UNKNOWN VEventType = 0 - VEventType_GTID VEventType = 1 - VEventType_BEGIN VEventType = 2 - VEventType_COMMIT VEventType = 3 - VEventType_ROLLBACK VEventType = 4 - VEventType_DDL VEventType = 5 - VEventType_INSERT VEventType = 6 - VEventType_REPLACE VEventType = 7 - VEventType_UPDATE VEventType = 8 - VEventType_DELETE VEventType = 9 - VEventType_SET VEventType = 10 - VEventType_OTHER VEventType = 11 - VEventType_ROW VEventType = 12 - VEventType_FIELD VEventType = 13 + VEventType_UNKNOWN VEventType = 0 + VEventType_GTID VEventType = 1 + VEventType_BEGIN VEventType = 2 + VEventType_COMMIT VEventType = 3 + VEventType_ROLLBACK VEventType = 4 + VEventType_DDL VEventType = 5 + VEventType_INSERT VEventType = 6 + VEventType_REPLACE VEventType = 7 + VEventType_UPDATE VEventType = 8 + VEventType_DELETE VEventType = 9 + VEventType_SET VEventType = 10 + VEventType_OTHER VEventType = 11 + VEventType_ROW VEventType = 12 + VEventType_FIELD VEventType = 13 + VEventType_HEARTBEAT VEventType = 14 ) var VEventType_name = map[int32]string{ @@ -88,29 +89,31 @@ var VEventType_name = map[int32]string{ 11: "OTHER", 12: "ROW", 13: "FIELD", + 14: "HEARTBEAT", } var VEventType_value = map[string]int32{ - "UNKNOWN": 0, - "GTID": 1, - "BEGIN": 2, - "COMMIT": 3, - "ROLLBACK": 4, - "DDL": 5, - "INSERT": 6, - "REPLACE": 7, - "UPDATE": 8, - "DELETE": 9, - "SET": 10, - "OTHER": 11, - "ROW": 12, - "FIELD": 13, + "UNKNOWN": 0, + "GTID": 1, + "BEGIN": 2, + "COMMIT": 3, + "ROLLBACK": 4, + "DDL": 5, + "INSERT": 6, + "REPLACE": 7, + "UPDATE": 8, + "DELETE": 9, + "SET": 10, + "OTHER": 11, + "ROW": 12, + "FIELD": 13, + "HEARTBEAT": 14, } func (x VEventType) String() string { return proto.EnumName(VEventType_name, int32(x)) } func (VEventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1} } type BinlogTransaction_Statement_Category int32 @@ -158,7 +161,7 @@ func (x BinlogTransaction_Statement_Category) String() string { return proto.EnumName(BinlogTransaction_Statement_Category_name, int32(x)) } func (BinlogTransaction_Statement_Category) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1, 0, 0} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1, 0, 0} } // Charset is the per-statement charset info from a QUERY_EVENT binlog entry. @@ -178,7 +181,7 @@ func (m *Charset) Reset() { *m = Charset{} } func (m *Charset) String() string { return proto.CompactTextString(m) } func (*Charset) ProtoMessage() {} func (*Charset) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{0} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{0} } func (m *Charset) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Charset.Unmarshal(m, b) @@ -235,7 +238,7 @@ func (m *BinlogTransaction) Reset() { *m = BinlogTransaction{} } func (m *BinlogTransaction) String() string { return proto.CompactTextString(m) } func (*BinlogTransaction) ProtoMessage() {} func (*BinlogTransaction) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1} } func (m *BinlogTransaction) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogTransaction.Unmarshal(m, b) @@ -285,7 +288,7 @@ func (m *BinlogTransaction_Statement) Reset() { *m = BinlogTransaction_S func (m *BinlogTransaction_Statement) String() string { return proto.CompactTextString(m) } func (*BinlogTransaction_Statement) ProtoMessage() {} func (*BinlogTransaction_Statement) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1, 0} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1, 0} } func (m *BinlogTransaction_Statement) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogTransaction_Statement.Unmarshal(m, b) @@ -343,7 +346,7 @@ func (m *StreamKeyRangeRequest) Reset() { *m = StreamKeyRangeRequest{} } func (m *StreamKeyRangeRequest) String() string { return proto.CompactTextString(m) } func (*StreamKeyRangeRequest) ProtoMessage() {} func (*StreamKeyRangeRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{2} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{2} } func (m *StreamKeyRangeRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamKeyRangeRequest.Unmarshal(m, b) @@ -396,7 +399,7 @@ func (m *StreamKeyRangeResponse) Reset() { *m = StreamKeyRangeResponse{} func (m *StreamKeyRangeResponse) String() string { return proto.CompactTextString(m) } func (*StreamKeyRangeResponse) ProtoMessage() {} func (*StreamKeyRangeResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{3} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{3} } func (m *StreamKeyRangeResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamKeyRangeResponse.Unmarshal(m, b) @@ -440,7 +443,7 @@ func (m *StreamTablesRequest) Reset() { *m = StreamTablesRequest{} } func (m *StreamTablesRequest) String() string { return proto.CompactTextString(m) } func (*StreamTablesRequest) ProtoMessage() {} func (*StreamTablesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{4} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{4} } func (m *StreamTablesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamTablesRequest.Unmarshal(m, b) @@ -493,7 +496,7 @@ func (m *StreamTablesResponse) Reset() { *m = StreamTablesResponse{} } func (m *StreamTablesResponse) String() string { return proto.CompactTextString(m) } func (*StreamTablesResponse) ProtoMessage() {} func (*StreamTablesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{5} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{5} } func (m *StreamTablesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamTablesResponse.Unmarshal(m, b) @@ -538,7 +541,7 @@ func (m *Rule) Reset() { *m = Rule{} } func (m *Rule) String() string { return proto.CompactTextString(m) } func (*Rule) ProtoMessage() {} func (*Rule) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{6} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{6} } func (m *Rule) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Rule.Unmarshal(m, b) @@ -585,7 +588,7 @@ func (m *Filter) Reset() { *m = Filter{} } func (m *Filter) String() string { return proto.CompactTextString(m) } func (*Filter) ProtoMessage() {} func (*Filter) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{7} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{7} } func (m *Filter) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Filter.Unmarshal(m, b) @@ -640,7 +643,7 @@ func (m *BinlogSource) Reset() { *m = BinlogSource{} } func (m *BinlogSource) String() string { return proto.CompactTextString(m) } func (*BinlogSource) ProtoMessage() {} func (*BinlogSource) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{8} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{8} } func (m *BinlogSource) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogSource.Unmarshal(m, b) @@ -722,7 +725,7 @@ func (m *RowChange) Reset() { *m = RowChange{} } func (m *RowChange) String() string { return proto.CompactTextString(m) } func (*RowChange) ProtoMessage() {} func (*RowChange) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{9} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{9} } func (m *RowChange) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RowChange.Unmarshal(m, b) @@ -769,7 +772,7 @@ func (m *RowEvent) Reset() { *m = RowEvent{} } func (m *RowEvent) String() string { return proto.CompactTextString(m) } func (*RowEvent) ProtoMessage() {} func (*RowEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{10} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{10} } func (m *RowEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RowEvent.Unmarshal(m, b) @@ -815,7 +818,7 @@ func (m *FieldEvent) Reset() { *m = FieldEvent{} } func (m *FieldEvent) String() string { return proto.CompactTextString(m) } func (*FieldEvent) ProtoMessage() {} func (*FieldEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{11} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{11} } func (m *FieldEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FieldEvent.Unmarshal(m, b) @@ -851,22 +854,24 @@ func (m *FieldEvent) GetFields() []*query.Field { // VEvent represents a vstream event type VEvent struct { - Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` - Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"` - RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"` - FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` + Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"` + RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"` + FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"` + // current_time specifies the current time to handle clock skew. + CurrentTime int64 `protobuf:"varint,20,opt,name=current_time,json=currentTime,proto3" json:"current_time,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *VEvent) Reset() { *m = VEvent{} } func (m *VEvent) String() string { return proto.CompactTextString(m) } func (*VEvent) ProtoMessage() {} func (*VEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{12} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{12} } func (m *VEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VEvent.Unmarshal(m, b) @@ -928,6 +933,13 @@ func (m *VEvent) GetFieldEvent() *FieldEvent { return nil } +func (m *VEvent) GetCurrentTime() int64 { + if m != nil { + return m.CurrentTime + } + return 0 +} + // VStreamRequest is the payload for VStream type VStreamRequest struct { EffectiveCallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=effective_caller_id,json=effectiveCallerId,proto3" json:"effective_caller_id,omitempty"` @@ -944,7 +956,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} } func (m *VStreamRequest) String() string { return proto.CompactTextString(m) } func (*VStreamRequest) ProtoMessage() {} func (*VStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{13} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{13} } func (m *VStreamRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamRequest.Unmarshal(m, b) @@ -1011,7 +1023,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} } func (m *VStreamResponse) String() string { return proto.CompactTextString(m) } func (*VStreamResponse) ProtoMessage() {} func (*VStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_6d214635eb8c538c, []int{14} + return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{14} } func (m *VStreamResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamResponse.Unmarshal(m, b) @@ -1060,82 +1072,84 @@ func init() { proto.RegisterEnum("binlogdata.BinlogTransaction_Statement_Category", BinlogTransaction_Statement_Category_name, BinlogTransaction_Statement_Category_value) } -func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_6d214635eb8c538c) } - -var fileDescriptor_binlogdata_6d214635eb8c538c = []byte{ - // 1184 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5b, 0x6e, 0xdb, 0x56, - 0x13, 0x8e, 0x44, 0x8a, 0x12, 0x87, 0x8e, 0x4d, 0x1f, 0x5f, 0x7e, 0xc1, 0xf8, 0x03, 0x18, 0x44, - 0xdb, 0xb8, 0x06, 0x2a, 0xa7, 0xea, 0xed, 0xa9, 0x2d, 0x2c, 0x91, 0x71, 0x95, 0xd0, 0x92, 0x73, - 0xcc, 0x24, 0x45, 0x5e, 0x08, 0x9a, 0x3c, 0xb2, 0x09, 0x53, 0xa4, 0x4c, 0x1e, 0xcb, 0xd5, 0x0a, - 0xba, 0x80, 0xbe, 0x76, 0x03, 0xed, 0x42, 0xba, 0x92, 0x76, 0x1f, 0xc5, 0xb9, 0x90, 0x92, 0x1c, - 0xa0, 0x71, 0x1f, 0xfa, 0x36, 0xf7, 0x33, 0xf3, 0xcd, 0x70, 0x86, 0x60, 0x5e, 0xc4, 0x69, 0x92, - 0x5d, 0x46, 0x01, 0x0d, 0x3a, 0xd3, 0x3c, 0xa3, 0x19, 0x82, 0x85, 0x64, 0xcf, 0x98, 0xd1, 0x7c, - 0x1a, 0x0a, 0xc5, 0x9e, 0x71, 0x73, 0x4b, 0xf2, 0xb9, 0x64, 0xd6, 0x69, 0x36, 0xcd, 0x16, 0x5e, - 0xd6, 0x29, 0x34, 0xfb, 0x57, 0x41, 0x5e, 0x10, 0x8a, 0x76, 0x41, 0x0b, 0x93, 0x98, 0xa4, 0xb4, - 0x5d, 0xdb, 0xaf, 0x1d, 0x34, 0xb0, 0xe4, 0x10, 0x02, 0x35, 0xcc, 0xd2, 0xb4, 0x5d, 0xe7, 0x52, - 0x4e, 0x33, 0xdb, 0x82, 0xe4, 0x33, 0x92, 0xb7, 0x15, 0x61, 0x2b, 0x38, 0xeb, 0x2f, 0x05, 0x36, - 0x7b, 0x3c, 0x0f, 0x2f, 0x0f, 0xd2, 0x22, 0x08, 0x69, 0x9c, 0xa5, 0xe8, 0x04, 0xa0, 0xa0, 0x01, - 0x25, 0x13, 0x92, 0xd2, 0xa2, 0x5d, 0xdb, 0x57, 0x0e, 0x8c, 0xee, 0xd3, 0xce, 0x52, 0x05, 0xef, - 0xb9, 0x74, 0xce, 0x4b, 0x7b, 0xbc, 0xe4, 0x8a, 0xba, 0x60, 0x90, 0x19, 0x49, 0xa9, 0x4f, 0xb3, - 0x6b, 0x92, 0xb6, 0xd5, 0xfd, 0xda, 0x81, 0xd1, 0xdd, 0xec, 0x88, 0x02, 0x1d, 0xa6, 0xf1, 0x98, - 0x02, 0x03, 0xa9, 0xe8, 0xbd, 0x3f, 0xea, 0xa0, 0x57, 0xd1, 0x90, 0x0b, 0xad, 0x30, 0xa0, 0xe4, - 0x32, 0xcb, 0xe7, 0xbc, 0xcc, 0xf5, 0xee, 0xb3, 0x07, 0x26, 0xd2, 0xe9, 0x4b, 0x3f, 0x5c, 0x45, - 0x40, 0x9f, 0x41, 0x33, 0x14, 0xe8, 0x71, 0x74, 0x8c, 0xee, 0xd6, 0x72, 0x30, 0x09, 0x2c, 0x2e, - 0x6d, 0x90, 0x09, 0x4a, 0x71, 0x93, 0x70, 0xc8, 0xd6, 0x30, 0x23, 0xad, 0xdf, 0x6a, 0xd0, 0x2a, - 0xe3, 0xa2, 0x2d, 0xd8, 0xe8, 0xb9, 0xfe, 0xeb, 0x21, 0x76, 0xfa, 0xa3, 0x93, 0xe1, 0xe0, 0x9d, - 0x63, 0x9b, 0x8f, 0xd0, 0x1a, 0xb4, 0x7a, 0xae, 0xdf, 0x73, 0x4e, 0x06, 0x43, 0xb3, 0x86, 0x1e, - 0x83, 0xde, 0x73, 0xfd, 0xfe, 0xe8, 0xf4, 0x74, 0xe0, 0x99, 0x75, 0xb4, 0x01, 0x46, 0xcf, 0xf5, - 0xf1, 0xc8, 0x75, 0x7b, 0xc7, 0xfd, 0x97, 0xa6, 0x82, 0x76, 0x60, 0xb3, 0xe7, 0xfa, 0xf6, 0xa9, - 0xeb, 0xdb, 0xce, 0x19, 0x76, 0xfa, 0xc7, 0x9e, 0x63, 0x9b, 0x2a, 0x02, 0xd0, 0x98, 0xd8, 0x76, - 0xcd, 0x86, 0xa4, 0xcf, 0x1d, 0xcf, 0xd4, 0x64, 0xb8, 0xc1, 0xf0, 0xdc, 0xc1, 0x9e, 0xd9, 0x94, - 0xec, 0xeb, 0x33, 0xfb, 0xd8, 0x73, 0xcc, 0x96, 0x64, 0x6d, 0xc7, 0x75, 0x3c, 0xc7, 0xd4, 0x5f, - 0xa8, 0xad, 0xba, 0xa9, 0xbc, 0x50, 0x5b, 0x8a, 0xa9, 0x5a, 0xbf, 0xd4, 0x60, 0xe7, 0x9c, 0xe6, - 0x24, 0x98, 0xbc, 0x24, 0x73, 0x1c, 0xa4, 0x97, 0x04, 0x93, 0x9b, 0x5b, 0x52, 0x50, 0xb4, 0x07, - 0xad, 0x69, 0x56, 0xc4, 0x0c, 0x3b, 0x0e, 0xb0, 0x8e, 0x2b, 0x1e, 0x1d, 0x81, 0x7e, 0x4d, 0xe6, - 0x7e, 0xce, 0xec, 0x25, 0x60, 0xa8, 0x53, 0x0d, 0x64, 0x15, 0xa9, 0x75, 0x2d, 0xa9, 0x65, 0x7c, - 0x95, 0x0f, 0xe3, 0x6b, 0x8d, 0x61, 0xf7, 0x7e, 0x52, 0xc5, 0x34, 0x4b, 0x0b, 0x82, 0x5c, 0x40, - 0xc2, 0xd1, 0xa7, 0x8b, 0xde, 0xf2, 0xfc, 0x8c, 0xee, 0x93, 0x7f, 0x1c, 0x00, 0xbc, 0x79, 0x71, - 0x5f, 0x64, 0xfd, 0x04, 0x5b, 0xe2, 0x1d, 0x2f, 0xb8, 0x48, 0x48, 0xf1, 0x90, 0xd2, 0x77, 0x41, - 0xa3, 0xdc, 0xb8, 0x5d, 0xdf, 0x57, 0x0e, 0x74, 0x2c, 0xb9, 0x7f, 0x5b, 0x61, 0x04, 0xdb, 0xab, - 0x2f, 0xff, 0x27, 0xf5, 0x7d, 0x09, 0x2a, 0xbe, 0x4d, 0x08, 0xda, 0x86, 0xc6, 0x24, 0xa0, 0xe1, - 0x95, 0xac, 0x46, 0x30, 0xac, 0x94, 0x71, 0x9c, 0x50, 0x92, 0xf3, 0x16, 0xea, 0x58, 0x72, 0xd6, - 0x33, 0xd0, 0x9e, 0x73, 0x0a, 0x7d, 0x02, 0x8d, 0xfc, 0x96, 0xd5, 0x2a, 0x3e, 0x75, 0x73, 0x39, - 0x01, 0x16, 0x18, 0x0b, 0xb5, 0xf5, 0x6b, 0x1d, 0xd6, 0x44, 0x42, 0xe7, 0xd9, 0x6d, 0x1e, 0x12, - 0x86, 0xe0, 0x35, 0x99, 0x17, 0xd3, 0x20, 0x24, 0x25, 0x82, 0x25, 0xcf, 0x92, 0x29, 0xae, 0x82, - 0x3c, 0x92, 0xaf, 0x0a, 0x06, 0x7d, 0x05, 0x06, 0x47, 0x92, 0xfa, 0x74, 0x3e, 0x25, 0x1c, 0xc3, - 0xf5, 0xee, 0xf6, 0x62, 0xa8, 0x38, 0x4e, 0xd4, 0x9b, 0x4f, 0x09, 0x06, 0x5a, 0xd1, 0xab, 0x93, - 0xa8, 0x3e, 0x60, 0x12, 0x17, 0xfd, 0x6b, 0xac, 0xf4, 0xef, 0xb0, 0x02, 0x43, 0x93, 0x51, 0x96, - 0x6a, 0x15, 0x70, 0x94, 0x00, 0xa1, 0x0e, 0x68, 0x59, 0xea, 0x47, 0x51, 0xd2, 0x6e, 0xf2, 0x34, - 0xff, 0xb7, 0x6c, 0x3b, 0x4a, 0x6d, 0xdb, 0x3d, 0x16, 0x2d, 0x69, 0x64, 0xa9, 0x1d, 0x25, 0xd6, - 0x2b, 0xd0, 0x71, 0x76, 0xd7, 0xbf, 0xe2, 0x09, 0x58, 0xa0, 0x5d, 0x90, 0x71, 0x96, 0x13, 0xd9, - 0x55, 0x90, 0x5b, 0x0f, 0x67, 0x77, 0x58, 0x6a, 0xd0, 0x3e, 0x34, 0x82, 0x71, 0xd9, 0x98, 0x55, - 0x13, 0xa1, 0xb0, 0x02, 0x68, 0xe1, 0xec, 0x8e, 0x6f, 0x4a, 0xf4, 0x04, 0x04, 0x22, 0x7e, 0x1a, - 0x4c, 0x4a, 0xb8, 0x75, 0x2e, 0x19, 0x06, 0x13, 0x82, 0xbe, 0x06, 0x23, 0xcf, 0xee, 0xfc, 0x90, - 0x3f, 0x2f, 0xc6, 0xd6, 0xe8, 0xee, 0xac, 0xb4, 0xb2, 0x4c, 0x0e, 0x43, 0x5e, 0x92, 0x85, 0xf5, - 0x0a, 0xe0, 0x79, 0x4c, 0x92, 0xe8, 0x41, 0x8f, 0x7c, 0xc4, 0xe0, 0x23, 0x49, 0x54, 0xc6, 0x5f, - 0x93, 0x29, 0xf3, 0x08, 0x58, 0xea, 0xac, 0x3f, 0x6b, 0xa0, 0xbd, 0x11, 0xf1, 0x0e, 0x41, 0xe5, - 0x8d, 0x16, 0xbb, 0x7b, 0x77, 0x39, 0x1d, 0x61, 0xc1, 0x5b, 0xcd, 0x6d, 0xd0, 0xff, 0x41, 0xa7, - 0xf1, 0x84, 0x14, 0x34, 0x98, 0x4c, 0x39, 0x24, 0x0a, 0x5e, 0x08, 0xd8, 0x59, 0xbb, 0xa4, 0x71, - 0xc4, 0x47, 0x46, 0xc7, 0x9c, 0x66, 0x0b, 0x9a, 0xb5, 0x47, 0xe5, 0x22, 0x46, 0xa2, 0xcf, 0x41, - 0x67, 0x28, 0xf0, 0x7b, 0xd2, 0x6e, 0x70, 0x58, 0xb7, 0xef, 0x61, 0xc0, 0x9f, 0xc5, 0xad, 0xbc, - 0xc4, 0xf5, 0x1b, 0x30, 0x78, 0xde, 0xd2, 0x49, 0xcc, 0xc5, 0xee, 0xea, 0x5c, 0x94, 0xf8, 0x60, - 0x18, 0x57, 0xb4, 0xf5, 0x73, 0x1d, 0xd6, 0xdf, 0x88, 0xcf, 0xbb, 0x5c, 0x29, 0xdf, 0xc3, 0x16, - 0x19, 0x8f, 0x49, 0x48, 0xe3, 0x19, 0xf1, 0xc3, 0x20, 0x49, 0x48, 0xee, 0xc7, 0x91, 0x1c, 0x81, - 0x8d, 0x8e, 0x38, 0xf3, 0x7d, 0x2e, 0x1f, 0xd8, 0x78, 0xb3, 0xb2, 0x95, 0xa2, 0x08, 0x39, 0xb0, - 0x15, 0x4f, 0x26, 0x24, 0x8a, 0x03, 0xba, 0x1c, 0x40, 0x0c, 0xc8, 0x8e, 0x44, 0xfb, 0x8d, 0x77, - 0x12, 0x50, 0xb2, 0x08, 0x53, 0x79, 0x54, 0x61, 0x3e, 0x66, 0xe3, 0x9f, 0x5f, 0x56, 0x5b, 0xea, - 0xb1, 0xf4, 0xf4, 0xb8, 0x10, 0x4b, 0xe5, 0xca, 0x06, 0x54, 0xef, 0x6d, 0xc0, 0xc5, 0x97, 0xd2, - 0xf8, 0xd0, 0x97, 0x62, 0x7d, 0x0b, 0x1b, 0x15, 0x10, 0x72, 0xc3, 0x1d, 0x82, 0xc6, 0xf1, 0x2c, - 0x97, 0x0a, 0x7a, 0xbf, 0xf5, 0x58, 0x5a, 0x1c, 0x7e, 0x07, 0xc6, 0xd2, 0xe7, 0xc4, 0x2e, 0xde, - 0xe0, 0x64, 0x38, 0xc2, 0x8e, 0xf9, 0x08, 0xb5, 0x40, 0x3d, 0xf7, 0x46, 0x67, 0x66, 0x8d, 0x51, - 0xce, 0x8f, 0x4e, 0x5f, 0x5c, 0x51, 0x46, 0xf9, 0xd2, 0x48, 0x39, 0xfc, 0xbd, 0x06, 0xb0, 0x98, - 0x26, 0x64, 0x40, 0xf3, 0xf5, 0xf0, 0xe5, 0x70, 0xf4, 0x76, 0x28, 0x02, 0x9c, 0x78, 0x03, 0xdb, - 0xac, 0x21, 0x1d, 0x1a, 0xe2, 0x2c, 0xd7, 0xd9, 0x0b, 0xf2, 0x26, 0x2b, 0xec, 0x60, 0x57, 0x07, - 0x59, 0x45, 0x4d, 0x50, 0xaa, 0xb3, 0x2b, 0xef, 0xac, 0xc6, 0x02, 0x62, 0xe7, 0xcc, 0x3d, 0xee, - 0x3b, 0x66, 0x93, 0x29, 0xaa, 0x8b, 0x0b, 0xa0, 0x95, 0xe7, 0x96, 0x79, 0xb2, 0x23, 0x0d, 0xec, - 0x9d, 0x91, 0xf7, 0x83, 0x83, 0x4d, 0x83, 0xc9, 0xf0, 0xe8, 0xad, 0xb9, 0xc6, 0x64, 0xcf, 0x07, - 0x8e, 0x6b, 0x9b, 0x8f, 0x7b, 0x9f, 0xbe, 0x7b, 0x3a, 0x8b, 0x29, 0x29, 0x8a, 0x4e, 0x9c, 0x1d, - 0x09, 0xea, 0xe8, 0x32, 0x3b, 0x9a, 0xd1, 0x23, 0xfe, 0x87, 0x77, 0xb4, 0x80, 0xe9, 0x42, 0xe3, - 0x92, 0x2f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x68, 0xbd, 0x20, 0x05, 0x3d, 0x0a, 0x00, 0x00, +func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_60517ed2deb82a7b) } + +var fileDescriptor_binlogdata_60517ed2deb82a7b = []byte{ + // 1215 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46, + 0x10, 0x8e, 0x44, 0x8a, 0x12, 0x87, 0x8e, 0x4d, 0xaf, 0x7f, 0x2a, 0x18, 0x0d, 0xe0, 0x12, 0x6d, + 0xe3, 0x1a, 0xa8, 0x9c, 0xaa, 0x7f, 0x4f, 0x6d, 0x21, 0x89, 0x8c, 0xa3, 0x84, 0x96, 0x9c, 0x35, + 0x93, 0x14, 0x79, 0x21, 0x68, 0x72, 0x65, 0x13, 0xa6, 0x48, 0x85, 0x5c, 0xdb, 0xd5, 0x09, 0x7a, + 0x80, 0xbe, 0xf6, 0x02, 0x3d, 0x42, 0x2f, 0xd0, 0x9b, 0xf4, 0x1e, 0xc5, 0xfe, 0x90, 0x92, 0x1c, + 0xa0, 0x71, 0x1f, 0xfa, 0x36, 0xff, 0x3b, 0xf3, 0xcd, 0x70, 0x86, 0x60, 0x9e, 0xc7, 0x69, 0x92, + 0x5d, 0x44, 0x01, 0x0d, 0x3a, 0xb3, 0x3c, 0xa3, 0x19, 0x82, 0x85, 0x64, 0xcf, 0xb8, 0xa1, 0xf9, + 0x2c, 0x14, 0x8a, 0x3d, 0xe3, 0xdd, 0x35, 0xc9, 0xe7, 0x92, 0x59, 0xa7, 0xd9, 0x2c, 0x5b, 0x78, + 0x59, 0x27, 0xd0, 0x1c, 0x5c, 0x06, 0x79, 0x41, 0x28, 0xda, 0x05, 0x2d, 0x4c, 0x62, 0x92, 0xd2, + 0x76, 0x6d, 0xbf, 0x76, 0xd0, 0xc0, 0x92, 0x43, 0x08, 0xd4, 0x30, 0x4b, 0xd3, 0x76, 0x9d, 0x4b, + 0x39, 0xcd, 0x6c, 0x0b, 0x92, 0xdf, 0x90, 0xbc, 0xad, 0x08, 0x5b, 0xc1, 0x59, 0x7f, 0x2b, 0xb0, + 0xd9, 0xe7, 0x79, 0x78, 0x79, 0x90, 0x16, 0x41, 0x48, 0xe3, 0x2c, 0x45, 0xc7, 0x00, 0x05, 0x0d, + 0x28, 0x99, 0x92, 0x94, 0x16, 0xed, 0xda, 0xbe, 0x72, 0x60, 0x74, 0x1f, 0x77, 0x96, 0x2a, 0x78, + 0xcf, 0xa5, 0x73, 0x56, 0xda, 0xe3, 0x25, 0x57, 0xd4, 0x05, 0x83, 0xdc, 0x90, 0x94, 0xfa, 0x34, + 0xbb, 0x22, 0x69, 0x5b, 0xdd, 0xaf, 0x1d, 0x18, 0xdd, 0xcd, 0x8e, 0x28, 0xd0, 0x61, 0x1a, 0x8f, + 0x29, 0x30, 0x90, 0x8a, 0xde, 0xfb, 0xab, 0x0e, 0x7a, 0x15, 0x0d, 0xb9, 0xd0, 0x0a, 0x03, 0x4a, + 0x2e, 0xb2, 0x7c, 0xce, 0xcb, 0x5c, 0xef, 0x3e, 0xb9, 0x67, 0x22, 0x9d, 0x81, 0xf4, 0xc3, 0x55, + 0x04, 0xf4, 0x25, 0x34, 0x43, 0x81, 0x1e, 0x47, 0xc7, 0xe8, 0x6e, 0x2d, 0x07, 0x93, 0xc0, 0xe2, + 0xd2, 0x06, 0x99, 0xa0, 0x14, 0xef, 0x12, 0x0e, 0xd9, 0x1a, 0x66, 0xa4, 0xf5, 0x47, 0x0d, 0x5a, + 0x65, 0x5c, 0xb4, 0x05, 0x1b, 0x7d, 0xd7, 0x7f, 0x35, 0xc2, 0xce, 0x60, 0x7c, 0x3c, 0x1a, 0xbe, + 0x75, 0x6c, 0xf3, 0x01, 0x5a, 0x83, 0x56, 0xdf, 0xf5, 0xfb, 0xce, 0xf1, 0x70, 0x64, 0xd6, 0xd0, + 0x43, 0xd0, 0xfb, 0xae, 0x3f, 0x18, 0x9f, 0x9c, 0x0c, 0x3d, 0xb3, 0x8e, 0x36, 0xc0, 0xe8, 0xbb, + 0x3e, 0x1e, 0xbb, 0x6e, 0xbf, 0x37, 0x78, 0x61, 0x2a, 0x68, 0x07, 0x36, 0xfb, 0xae, 0x6f, 0x9f, + 0xb8, 0xbe, 0xed, 0x9c, 0x62, 0x67, 0xd0, 0xf3, 0x1c, 0xdb, 0x54, 0x11, 0x80, 0xc6, 0xc4, 0xb6, + 0x6b, 0x36, 0x24, 0x7d, 0xe6, 0x78, 0xa6, 0x26, 0xc3, 0x0d, 0x47, 0x67, 0x0e, 0xf6, 0xcc, 0xa6, + 0x64, 0x5f, 0x9d, 0xda, 0x3d, 0xcf, 0x31, 0x5b, 0x92, 0xb5, 0x1d, 0xd7, 0xf1, 0x1c, 0x53, 0x7f, + 0xae, 0xb6, 0xea, 0xa6, 0xf2, 0x5c, 0x6d, 0x29, 0xa6, 0x6a, 0xfd, 0x56, 0x83, 0x9d, 0x33, 0x9a, + 0x93, 0x60, 0xfa, 0x82, 0xcc, 0x71, 0x90, 0x5e, 0x10, 0x4c, 0xde, 0x5d, 0x93, 0x82, 0xa2, 0x3d, + 0x68, 0xcd, 0xb2, 0x22, 0x66, 0xd8, 0x71, 0x80, 0x75, 0x5c, 0xf1, 0xe8, 0x08, 0xf4, 0x2b, 0x32, + 0xf7, 0x73, 0x66, 0x2f, 0x01, 0x43, 0x9d, 0x6a, 0x20, 0xab, 0x48, 0xad, 0x2b, 0x49, 0x2d, 0xe3, + 0xab, 0x7c, 0x18, 0x5f, 0x6b, 0x02, 0xbb, 0x77, 0x93, 0x2a, 0x66, 0x59, 0x5a, 0x10, 0xe4, 0x02, + 0x12, 0x8e, 0x3e, 0x5d, 0xf4, 0x96, 0xe7, 0x67, 0x74, 0x1f, 0xfd, 0xeb, 0x00, 0xe0, 0xcd, 0xf3, + 0xbb, 0x22, 0xeb, 0x17, 0xd8, 0x12, 0xef, 0x78, 0xc1, 0x79, 0x42, 0x8a, 0xfb, 0x94, 0xbe, 0x0b, + 0x1a, 0xe5, 0xc6, 0xed, 0xfa, 0xbe, 0x72, 0xa0, 0x63, 0xc9, 0xfd, 0xd7, 0x0a, 0x23, 0xd8, 0x5e, + 0x7d, 0xf9, 0x7f, 0xa9, 0xef, 0x1b, 0x50, 0xf1, 0x75, 0x42, 0xd0, 0x36, 0x34, 0xa6, 0x01, 0x0d, + 0x2f, 0x65, 0x35, 0x82, 0x61, 0xa5, 0x4c, 0xe2, 0x84, 0x92, 0x9c, 0xb7, 0x50, 0xc7, 0x92, 0xb3, + 0x9e, 0x80, 0xf6, 0x94, 0x53, 0xe8, 0x73, 0x68, 0xe4, 0xd7, 0xac, 0x56, 0xf1, 0xa9, 0x9b, 0xcb, + 0x09, 0xb0, 0xc0, 0x58, 0xa8, 0xad, 0xdf, 0xeb, 0xb0, 0x26, 0x12, 0x3a, 0xcb, 0xae, 0xf3, 0x90, + 0x30, 0x04, 0xaf, 0xc8, 0xbc, 0x98, 0x05, 0x21, 0x29, 0x11, 0x2c, 0x79, 0x96, 0x4c, 0x71, 0x19, + 0xe4, 0x91, 0x7c, 0x55, 0x30, 0xe8, 0x5b, 0x30, 0x38, 0x92, 0xd4, 0xa7, 0xf3, 0x19, 0xe1, 0x18, + 0xae, 0x77, 0xb7, 0x17, 0x43, 0xc5, 0x71, 0xa2, 0xde, 0x7c, 0x46, 0x30, 0xd0, 0x8a, 0x5e, 0x9d, + 0x44, 0xf5, 0x1e, 0x93, 0xb8, 0xe8, 0x5f, 0x63, 0xa5, 0x7f, 0x87, 0x15, 0x18, 0x9a, 0x8c, 0xb2, + 0x54, 0xab, 0x80, 0xa3, 0x04, 0x08, 0x75, 0x40, 0xcb, 0x52, 0x3f, 0x8a, 0x92, 0x76, 0x93, 0xa7, + 0xf9, 0xd1, 0xb2, 0xed, 0x38, 0xb5, 0x6d, 0xb7, 0x27, 0x5a, 0xd2, 0xc8, 0x52, 0x3b, 0x4a, 0xac, + 0x97, 0xa0, 0xe3, 0xec, 0x76, 0x70, 0xc9, 0x13, 0xb0, 0x40, 0x3b, 0x27, 0x93, 0x2c, 0x27, 0xb2, + 0xab, 0x20, 0xb7, 0x1e, 0xce, 0x6e, 0xb1, 0xd4, 0xa0, 0x7d, 0x68, 0x04, 0x93, 0xb2, 0x31, 0xab, + 0x26, 0x42, 0x61, 0x05, 0xd0, 0xc2, 0xd9, 0x2d, 0xdf, 0x94, 0xe8, 0x11, 0x08, 0x44, 0xfc, 0x34, + 0x98, 0x96, 0x70, 0xeb, 0x5c, 0x32, 0x0a, 0xa6, 0x04, 0x7d, 0x07, 0x46, 0x9e, 0xdd, 0xfa, 0x21, + 0x7f, 0x5e, 0x8c, 0xad, 0xd1, 0xdd, 0x59, 0x69, 0x65, 0x99, 0x1c, 0x86, 0xbc, 0x24, 0x0b, 0xeb, + 0x25, 0xc0, 0xd3, 0x98, 0x24, 0xd1, 0xbd, 0x1e, 0xf9, 0x94, 0xc1, 0x47, 0x92, 0xa8, 0x8c, 0xbf, + 0x26, 0x53, 0xe6, 0x11, 0xb0, 0xd4, 0x59, 0xbf, 0xd6, 0x41, 0x7b, 0x2d, 0xe2, 0x1d, 0x82, 0xca, + 0x1b, 0x2d, 0x76, 0xf7, 0xee, 0x72, 0x3a, 0xc2, 0x82, 0xb7, 0x9a, 0xdb, 0xa0, 0x8f, 0x41, 0xa7, + 0xf1, 0x94, 0x14, 0x34, 0x98, 0xce, 0x38, 0x24, 0x0a, 0x5e, 0x08, 0xd8, 0x59, 0xbb, 0xa0, 0x71, + 0xc4, 0x47, 0x46, 0xc7, 0x9c, 0x66, 0x0b, 0x9a, 0xb5, 0x47, 0xe5, 0x22, 0x46, 0xa2, 0xaf, 0x40, + 0x67, 0x28, 0xf0, 0x7b, 0xd2, 0x6e, 0x70, 0x58, 0xb7, 0xef, 0x60, 0xc0, 0x9f, 0xc5, 0xad, 0xbc, + 0xc4, 0xf5, 0x7b, 0x30, 0x78, 0xde, 0xd2, 0x49, 0xcc, 0xc5, 0xee, 0xea, 0x5c, 0x94, 0xf8, 0x60, + 0x98, 0x2c, 0xb0, 0xfa, 0x04, 0xd6, 0xc2, 0xeb, 0x3c, 0xe7, 0xf7, 0x2d, 0x9e, 0x92, 0xf6, 0x36, + 0x4f, 0xd9, 0x90, 0x32, 0x2f, 0x9e, 0x12, 0x86, 0xc4, 0xfa, 0x6b, 0xb1, 0x01, 0xca, 0xad, 0xf3, + 0x13, 0x6c, 0x91, 0xc9, 0x84, 0x84, 0x34, 0xbe, 0x21, 0x7e, 0x18, 0x24, 0x09, 0xc9, 0xfd, 0x38, + 0x92, 0x53, 0xb2, 0xd1, 0x11, 0x7f, 0x02, 0x03, 0x2e, 0x1f, 0xda, 0x78, 0xb3, 0xb2, 0x95, 0xa2, + 0x08, 0x39, 0xb0, 0x15, 0x4f, 0xa7, 0x24, 0x8a, 0x03, 0xba, 0x1c, 0x40, 0xcc, 0xd0, 0x8e, 0x6c, + 0xc8, 0x6b, 0xef, 0x38, 0xa0, 0x64, 0x11, 0xa6, 0xf2, 0xa8, 0xc2, 0x7c, 0xc6, 0xbe, 0x90, 0xfc, + 0xa2, 0x5a, 0x64, 0x0f, 0xa5, 0xa7, 0xc7, 0x85, 0x58, 0x2a, 0x57, 0x96, 0xa4, 0x7a, 0x67, 0x49, + 0x2e, 0x3e, 0xa6, 0xc6, 0x87, 0x3e, 0x26, 0xeb, 0x07, 0xd8, 0xa8, 0x80, 0x90, 0x4b, 0xf0, 0x10, + 0x34, 0x0e, 0x79, 0xb9, 0x77, 0xd0, 0xfb, 0xd3, 0x81, 0xa5, 0xc5, 0xe1, 0x8f, 0x60, 0x2c, 0x7d, + 0x71, 0xec, 0x28, 0x0e, 0x8f, 0x47, 0x63, 0xec, 0x98, 0x0f, 0x50, 0x0b, 0xd4, 0x33, 0x6f, 0x7c, + 0x6a, 0xd6, 0x18, 0xe5, 0xfc, 0xec, 0x0c, 0xc4, 0xa1, 0x65, 0x94, 0x2f, 0x8d, 0x94, 0xc3, 0x3f, + 0x6b, 0x00, 0x8b, 0x81, 0x43, 0x06, 0x34, 0x5f, 0x8d, 0x5e, 0x8c, 0xc6, 0x6f, 0x46, 0x22, 0xc0, + 0xb1, 0x37, 0xb4, 0xcd, 0x1a, 0xd2, 0xa1, 0x21, 0x2e, 0x77, 0x9d, 0xbd, 0x20, 0xcf, 0xb6, 0xc2, + 0x6e, 0x7a, 0x75, 0xb3, 0x55, 0xd4, 0x04, 0xa5, 0xba, 0xcc, 0xf2, 0x14, 0x6b, 0x2c, 0x20, 0x76, + 0x4e, 0xdd, 0xde, 0xc0, 0x31, 0x9b, 0x4c, 0x51, 0x1d, 0x65, 0x00, 0xad, 0xbc, 0xc8, 0xcc, 0x93, + 0xdd, 0x71, 0x60, 0xef, 0x8c, 0xbd, 0x67, 0x0e, 0x36, 0x0d, 0x26, 0xc3, 0xe3, 0x37, 0xe6, 0x1a, + 0x93, 0x3d, 0x1d, 0x3a, 0xae, 0x6d, 0x3e, 0x64, 0x87, 0xfc, 0x99, 0xd3, 0xc3, 0x5e, 0xdf, 0xe9, + 0x79, 0xe6, 0x7a, 0xff, 0x8b, 0xb7, 0x8f, 0x6f, 0x62, 0x4a, 0x8a, 0xa2, 0x13, 0x67, 0x47, 0x82, + 0x3a, 0xba, 0xc8, 0x8e, 0x6e, 0xe8, 0x11, 0xff, 0x27, 0x3c, 0x5a, 0xa0, 0x76, 0xae, 0x71, 0xc9, + 0xd7, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x0e, 0xe3, 0x8a, 0xa5, 0x6f, 0x0a, 0x00, 0x00, } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index f8a0520d868..a181bcf8569 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -39,7 +39,10 @@ import ( ) var ( - idleTimeout = 1 * time.Second + // idleTimeout is set to slightly above 1s, compared to heartbeatTime + // set by VStreamer at slightly below 1s. This minimizes conflicts + // between the two timeouts. + idleTimeout = 1100 * time.Millisecond dbLockRetryDelay = 1 * time.Second relayLogMaxSize = 10000 relayLogMaxItems = 1000 @@ -61,7 +64,11 @@ type vplayer struct { unsavedGTID *binlogdatapb.VEvent // timeLastSaved is set every time a GTID is saved. timeLastSaved time.Time - stopPos mysql.Position + // lastTimestampNs is the last timestamp seen so far. + lastTimestampNs int64 + // timeOffsetNs keeps track of the time offset w.r.t. source tablet. + timeOffsetNs int64 + stopPos mysql.Position // pplan is built based on the source Filter at the beginning. pplan *PlayerPlan @@ -197,6 +204,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err != nil { return err } + // No events were received. Update SecondsBehindMaster. + if len(items) == 0 { + behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs + vp.stats.SecondsBehindMaster.Set(behind / 1e9) + } // Filtered replication often ends up receiving a large number of empty transactions. // This is required because the player needs to know the latest position of the source. // This allows it to stop at that position if requested. @@ -221,6 +233,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } for i, events := range items { for j, event := range events { + if event.Timestamp != 0 { + vp.lastTimestampNs = event.Timestamp * 1e9 + vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime + vp.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp) + } mustSave := false switch event.Type { case binlogdatapb.VEventType_COMMIT: @@ -354,6 +371,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return err } } + case binlogdatapb.VEventType_HEARTBEAT: + // No-op: heartbeat timings are calculated in outer loop. } return nil } @@ -444,9 +463,6 @@ func (vp *vplayer) updatePos(ts int64) error { vp.unsavedGTID = nil vp.timeLastSaved = time.Now() vp.stats.SetLastPosition(vp.pos) - if ts != 0 { - vp.stats.SecondsBehindMaster.Set(vp.timeLastSaved.Unix() - ts) - } return nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 4bf378d9f81..6923e0c8b04 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "io" + "time" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -36,6 +37,11 @@ import ( var packetSize = flag.Int("vstream_packet_size", 10000, "Suggested packet size for VReplication streamer. This is used only as a recommendation. The actual packet size may be more or less than this amount.") +// heartbeatTime is set to slightly below 1s, compared to idleTimeout +// set by VPlayer at slightly above 1s. This minimizes conflicts +// between the two timeouts. +var heartbeatTime = 900 * time.Millisecond + type vstreamer struct { ctx context.Context cancel func() @@ -132,9 +138,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: // We never have to send GTID, BEGIN or FIELD events on their own. bufferedEvents = append(bufferedEvents, vevent) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL: - // COMMIT and DDL are terminal. There may be no more events after - // these for a long time. So, we have to send whatever we have. + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT: + // COMMIT, DDL and HEARTBEAT must be immediately sent. bufferedEvents = append(bufferedEvents, vevent) vevents := bufferedEvents bufferedEvents = nil @@ -167,7 +172,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } // Main loop: calls bufferAndTransmit as events arrive. + timer := time.NewTimer(heartbeatTime) + defer timer.Stop() for { + timer.Reset(heartbeatTime) + // Drain event if timer fired before reset. + select { + case <-timer.C: + default: + } + select { case ev, ok := <-events: if !ok { @@ -196,6 +210,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } case <-ctx.Done(): return nil + case <-timer.C: + now := time.Now().UnixNano() + if err := bufferAndTransmit(&binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_HEARTBEAT, + Timestamp: now / 1e9, + CurrentTime: now, + }); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("error sending event: %v", err) + } } } } @@ -392,6 +418,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } for _, vevent := range vevents { vevent.Timestamp = int64(ev.Timestamp()) + vevent.CurrentTime = time.Now().UnixNano() } return vevents, nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 517e19534ac..2aac52449b5 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -919,6 +919,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ t.Fatalf("%v: evs\n%v, want\n%v", input, evs, wantset) } for i, want := range wantset { + // CurrentTime is not testable. + evs[i].CurrentTime = 0 switch want { case "gtid|begin": if evs[i].Type != binlogdatapb.VEventType_GTID && evs[i].Type != binlogdatapb.VEventType_BEGIN { diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index a1471e44f5c..4942198823a 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -185,6 +185,7 @@ enum VEventType { OTHER = 11; ROW = 12; FIELD = 13; + HEARTBEAT = 14; } // RowChange represents one row change @@ -212,6 +213,8 @@ message VEvent { string ddl = 4; RowEvent row_event = 5; FieldEvent field_event = 6; + // current_time specifies the current time to handle clock skew. + int64 current_time = 20; } // VStreamRequest is the payload for VStream diff --git a/py/vtproto/binlogdata_pb2.py b/py/vtproto/binlogdata_pb2.py index e586658b98d..fe5c3d42e8c 100644 --- a/py/vtproto/binlogdata_pb2.py +++ b/py/vtproto/binlogdata_pb2.py @@ -23,7 +23,7 @@ package='binlogdata', syntax='proto3', serialized_options=_b('Z\'vitess.io/vitess/go/vt/proto/binlogdata'), - serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bvtrpc.proto\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xde\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\x12\'\n\x06on_ddl\x18\x07 \x01(\x0e\x32\x17.binlogdata.OnDDLAction\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\"\xb2\x01\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x04 \x01(\t\x12\'\n\trow_event\x18\x05 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x06 \x01(\x0b\x32\x16.binlogdata.FieldEvent\"\xc7\x01\n\x0eVStreamRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\x10\n\x08position\x18\x04 \x01(\t\x12\"\n\x06\x66ilter\x18\x05 \x01(\x0b\x32\x12.binlogdata.Filter\"5\n\x0fVStreamResponse\x12\"\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*>\n\x0bOnDDLAction\x12\n\n\x06IGNORE\x10\x00\x12\x08\n\x04STOP\x10\x01\x12\x08\n\x04\x45XEC\x10\x02\x12\x0f\n\x0b\x45XEC_IGNORE\x10\x03*\xaa\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\rB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3') + serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bvtrpc.proto\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xde\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\x12\'\n\x06on_ddl\x18\x07 \x01(\x0e\x32\x17.binlogdata.OnDDLAction\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\"\xc8\x01\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x04 \x01(\t\x12\'\n\trow_event\x18\x05 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x06 \x01(\x0b\x32\x16.binlogdata.FieldEvent\x12\x14\n\x0c\x63urrent_time\x18\x14 \x01(\x03\"\xc7\x01\n\x0eVStreamRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\x10\n\x08position\x18\x04 \x01(\t\x12\"\n\x06\x66ilter\x18\x05 \x01(\x0b\x32\x12.binlogdata.Filter\"5\n\x0fVStreamResponse\x12\"\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*>\n\x0bOnDDLAction\x12\n\n\x06IGNORE\x10\x00\x12\x08\n\x04STOP\x10\x01\x12\x08\n\x04\x45XEC\x10\x02\x12\x0f\n\x0b\x45XEC_IGNORE\x10\x03*\xb9\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\r\x12\r\n\tHEARTBEAT\x10\x0e\x42)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3') , dependencies=[vtrpc__pb2.DESCRIPTOR,query__pb2.DESCRIPTOR,topodata__pb2.DESCRIPTOR,]) @@ -52,8 +52,8 @@ ], containing_type=None, serialized_options=None, - serialized_start=1907, - serialized_end=1969, + serialized_start=1929, + serialized_end=1991, ) _sym_db.RegisterEnumDescriptor(_ONDDLACTION) @@ -120,11 +120,15 @@ name='FIELD', index=13, number=13, serialized_options=None, type=None), + _descriptor.EnumValueDescriptor( + name='HEARTBEAT', index=14, number=14, + serialized_options=None, + type=None), ], containing_type=None, serialized_options=None, - serialized_start=1972, - serialized_end=2142, + serialized_start=1994, + serialized_end=2179, ) _sym_db.RegisterEnumDescriptor(_VEVENTTYPE) @@ -147,6 +151,7 @@ OTHER = 11 ROW = 12 FIELD = 13 +HEARTBEAT = 14 _BINLOGTRANSACTION_STATEMENT_CATEGORY = _descriptor.EnumDescriptor( @@ -789,6 +794,13 @@ message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='current_time', full_name='binlogdata.VEvent.current_time', index=6, + number=20, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -802,7 +814,7 @@ oneofs=[ ], serialized_start=1470, - serialized_end=1648, + serialized_end=1670, ) @@ -860,8 +872,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1651, - serialized_end=1850, + serialized_start=1673, + serialized_end=1872, ) @@ -891,8 +903,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1852, - serialized_end=1905, + serialized_start=1874, + serialized_end=1927, ) _BINLOGTRANSACTION_STATEMENT.fields_by_name['category'].enum_type = _BINLOGTRANSACTION_STATEMENT_CATEGORY From bec54fd6f243f9b73d3d701762bea2c6f17995e9 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 3 Mar 2019 11:36:03 -0800 Subject: [PATCH 2/2] vplayer: address review comments Signed-off-by: Sugu Sougoumarane --- go/vt/vttablet/tabletmanager/vreplication/relaylog.go | 1 - go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go index f0714390d2e..489fd8e90c3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go @@ -38,7 +38,6 @@ type relayLog struct { curSize int items [][]*binlogdatapb.VEvent timedout bool - err error // canAccept is true if: curSize<=maxSize, len(items)0, ctx is not Done, and interuptFetch is false. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index a181bcf8569..53efc39cdba 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -66,7 +66,7 @@ type vplayer struct { timeLastSaved time.Time // lastTimestampNs is the last timestamp seen so far. lastTimestampNs int64 - // timeOffsetNs keeps track of the time offset w.r.t. source tablet. + // timeOffsetNs keeps track of the clock difference with respect to source tablet. timeOffsetNs int64 stopPos mysql.Position @@ -204,7 +204,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err != nil { return err } - // No events were received. Update SecondsBehindMaster. + // No events were received. This likely means that there's a network partition. + // So, we should assume we're falling behind. if len(items) == 0 { behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs vp.stats.SecondsBehindMaster.Set(behind / 1e9)