Skip to content

Commit

Permalink
fix error checking and rawData in parseSingleEvent function (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
holys authored and siddontang committed Aug 2, 2018
1 parent 58848a7 commit 8939dbe
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 24 deletions.
2 changes: 1 addition & 1 deletion replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
)

type BinlogEvent struct {
// raw binlog data, including crc32 checksum if exists
// raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists
RawData []byte

Header *EventHeader
Expand Down
40 changes: 19 additions & 21 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,48 +106,46 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool,
var err error
var n int64

headBuf := make([]byte, EventHeaderSize)

if _, err = io.ReadFull(r, headBuf); err == io.EOF {
var buf bytes.Buffer
if n, err = io.CopyN(&buf, r, EventHeaderSize); err == io.EOF {
return true, nil
} else if err != nil {
return false, errors.Trace(err)
return false, errors.Errorf("get event header err %v, need %d but got %d", err, EventHeaderSize, n)
}

var h *EventHeader
h, err = p.parseHeader(headBuf)
h, err = p.parseHeader(buf.Bytes())
if err != nil {
return false, errors.Trace(err)
}

if h.EventSize <= uint32(EventHeaderSize) {
return false, errors.Errorf("invalid event header, event size is %d, too small", h.EventSize)
}

var buf bytes.Buffer
if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(EventHeaderSize)); err != nil {
return false, errors.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, EventHeaderSize, n)
if n, err = io.CopyN(&buf, r, int64(h.EventSize-EventHeaderSize)); err != nil {
return false, errors.Errorf("get event err %v, need %d but got %d", err, h.EventSize, n)
}
if buf.Len() != int(h.EventSize) {
return false, errors.Errorf("invalid raw data size in event %s, need %d but got %d", h.EventType, h.EventSize, buf.Len())
}

data := buf.Bytes()
rawData := data

eventLen := int(h.EventSize) - EventHeaderSize

if len(data) != eventLen {
return false, errors.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
rawData := buf.Bytes()
bodyLen := int(h.EventSize) - EventHeaderSize
body := rawData[EventHeaderSize:]
if len(body) != bodyLen {
return false, errors.Errorf("invalid body data size in event %s, need %d but got %d", h.EventType, bodyLen, len(body))
}

var e Event
e, err = p.parseEvent(h, data, rawData)
e, err = p.parseEvent(h, body, rawData)
if err != nil {
if _, ok := err.(errMissingTableMapEvent); ok {
if err == errMissingTableMapEvent {
return false, nil
}
return false, errors.Trace(err)
}

if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil {
if err = onEvent(&BinlogEvent{RawData: rawData, Header: h, Event: e}); err != nil {
return false, errors.Trace(err)
}

Expand All @@ -163,7 +161,7 @@ func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {

done, err := p.parseSingleEvent(r, onEvent)
if err != nil {
if _, ok := err.(errMissingTableMapEvent); ok {
if err == errMissingTableMapEvent {
continue
}
return errors.Trace(err)
Expand Down Expand Up @@ -319,7 +317,7 @@ func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) {
return nil, err
}

return &BinlogEvent{rawData, h, e}, nil
return &BinlogEvent{RawData: rawData, Header: h, Event: e}, nil
}

func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error {
Expand Down
1 change: 1 addition & 0 deletions replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec(c *C) {
c.Assert(err, IsNil)

p := NewBinlogParser()
p.SetVerifyChecksum(true)

f := func(e *BinlogEvent) error {
if *testOutputLogs {
Expand Down
4 changes: 2 additions & 2 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/siddontang/go/hack"
)

type errMissingTableMapEvent error
var errMissingTableMapEvent = errors.New("invalid table id, no corresponding table map event")

type TableMapEvent struct {
tableIDSize int
Expand Down Expand Up @@ -267,7 +267,7 @@ func (e *RowsEvent) Decode(data []byte) error {
if len(e.tables) > 0 {
return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
} else {
return errMissingTableMapEvent(errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID))
return errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
}
}

Expand Down

0 comments on commit 8939dbe

Please sign in to comment.