Skip to content

Commit

Permalink
Fix replication of TIMESTAMP on non-UTC machines (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu authored and siddontang committed Aug 2, 2018
1 parent 29d34f1 commit 58848a7
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 19 deletions.
15 changes: 15 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ type BinlogSyncerConfig struct {
// We will use Local location for timestamp and UTC location for datatime.
ParseTime bool

// If ParseTime is false, convert TIMESTAMP into this specified timezone. If
// ParseTime is true, this option will have no effect and TIMESTAMP data will
// be parsed into the local timezone and a full time.Time struct will be
// returned.
//
// Note that MySQL TIMESTAMP columns are offset from the machine local
// timezone while DATETIME columns are offset from UTC. This is consistent
// with documented MySQL behaviour as it return TIMESTAMP in local timezone
// and DATETIME in UTC.
//
// Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time
// strings obtained from MySQL.
TimestampStringLocation *time.Location

// Use decimal.Decimal structure for decimals.
UseDecimal bool

Expand Down Expand Up @@ -124,6 +138,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b.parser = NewBinlogParser()
b.parser.SetRawMode(b.cfg.RawModeEnabled)
b.parser.SetParseTime(b.cfg.ParseTime)
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.running = false
Expand Down
9 changes: 8 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"
"sync/atomic"
"time"

"github.com/juju/errors"
)
Expand All @@ -25,7 +26,8 @@ type BinlogParser struct {
// for rawMode, we only parse FormatDescriptionEvent and RotateEvent
rawMode bool

parseTime bool
parseTime bool
timestampStringLocation *time.Location

// used to start/stop processing
stopProcessing uint32
Expand Down Expand Up @@ -183,6 +185,10 @@ func (p *BinlogParser) SetParseTime(parseTime bool) {
p.parseTime = parseTime
}

func (p *BinlogParser) SetTimestampStringLocation(timestampStringLocation *time.Location) {
p.timestampStringLocation = timestampStringLocation
}

func (p *BinlogParser) SetUseDecimal(useDecimal bool) {
p.useDecimal = useDecimal
}
Expand Down Expand Up @@ -347,6 +353,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
e.needBitmap2 = false
e.tables = p.tables
e.parseTime = p.parseTime
e.timestampStringLocation = p.timestampStringLocation
e.useDecimal = p.useDecimal

switch h.EventType {
Expand Down
47 changes: 32 additions & 15 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ type RowsEvent struct {
//rows: invalid: int64, float64, bool, []byte, string
Rows [][]interface{}

parseTime bool
useDecimal bool
parseTime bool
timestampStringLocation *time.Location
useDecimal bool
}

func (e *RowsEvent) Decode(data []byte) error {
Expand Down Expand Up @@ -422,10 +423,14 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
if t == 0 {
v = formatZeroTime(0, 0)
} else {
v = e.parseFracTime(fracTime{time.Unix(int64(t), 0), 0})
v = e.parseFracTime(fracTime{
Time: time.Unix(int64(t), 0),
Dec: 0,
timestampStringLocation: e.timestampStringLocation,
})
}
case MYSQL_TYPE_TIMESTAMP2:
v, n, err = decodeTimestamp2(data, meta)
v, n, err = decodeTimestamp2(data, meta, e.timestampStringLocation)
v = e.parseFracTime(v)
case MYSQL_TYPE_DATETIME:
n = 8
Expand All @@ -435,14 +440,19 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
} else {
d := i64 / 1000000
t := i64 % 1000000
v = e.parseFracTime(fracTime{time.Date(int(d/10000),
time.Month((d%10000)/100),
int(d%100),
int(t/10000),
int((t%10000)/100),
int(t%100),
0,
time.UTC), 0})
v = e.parseFracTime(fracTime{
Time: time.Date(
int(d/10000),
time.Month((d%10000)/100),
int(d%100),
int(t/10000),
int((t%10000)/100),
int(t%100),
0,
time.UTC,
),
Dec: 0,
})
}
case MYSQL_TYPE_DATETIME2:
v, n, err = decodeDatetime2(data, meta)
Expand Down Expand Up @@ -640,7 +650,7 @@ func decodeBit(data []byte, nbits int, length int) (value int64, err error) {
return
}

func decodeTimestamp2(data []byte, dec uint16) (interface{}, int, error) {
func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Location) (interface{}, int, error) {
//get timestamp binary length
n := int(4 + (dec+1)/2)
sec := int64(binary.BigEndian.Uint32(data[0:4]))
Expand All @@ -658,7 +668,11 @@ func decodeTimestamp2(data []byte, dec uint16) (interface{}, int, error) {
return formatZeroTime(int(usec), int(dec)), n, nil
}

return fracTime{time.Unix(sec, usec*1000), int(dec)}, n, nil
return fracTime{
Time: time.Unix(sec, usec*1000),
Dec: int(dec),
timestampStringLocation: timestampStringLocation,
}, n, nil
}

const DATETIMEF_INT_OFS int64 = 0x8000000000
Expand Down Expand Up @@ -704,7 +718,10 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
minute := int((hms >> 6) % (1 << 6))
hour := int((hms >> 12))

return fracTime{time.Date(year, time.Month(month), day, hour, minute, second, int(frac*1000), time.UTC), int(dec)}, n, nil
return fracTime{
Time: time.Date(year, time.Month(month), day, hour, minute, second, int(frac*1000), time.UTC),
Dec: int(dec),
}, n, nil
}

const TIMEF_OFS int64 = 0x800000000000
Expand Down
8 changes: 7 additions & 1 deletion replication/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ type fracTime struct {

// Dec must in [0, 6]
Dec int

timestampStringLocation *time.Location
}

func (t fracTime) String() string {
return t.Format(fracTimeFormat[t.Dec])
tt := t.Time
if t.timestampStringLocation != nil {
tt = tt.In(t.timestampStringLocation)
}
return tt.Format(fracTimeFormat[t.Dec])
}

func formatZeroTime(frac int, dec int) string {
Expand Down
23 changes: 21 additions & 2 deletions replication/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type testTimeSuite struct{}

var _ = Suite(&testTimeSuite{})

func (s *testSyncerSuite) TestTime(c *C) {
func (s *testTimeSuite) TestTime(c *C) {
tbls := []struct {
year int
month int
Expand All @@ -28,7 +28,7 @@ func (s *testSyncerSuite) TestTime(c *C) {
}

for _, t := range tbls {
t1 := fracTime{time.Date(t.year, time.Month(t.month), t.day, t.hour, t.min, t.sec, t.microSec*1000, time.UTC), t.frac}
t1 := fracTime{time.Date(t.year, time.Month(t.month), t.day, t.hour, t.min, t.sec, t.microSec*1000, time.UTC), t.frac, nil}
c.Assert(t1.String(), Equals, t.expected)
}

Expand All @@ -49,3 +49,22 @@ func (s *testSyncerSuite) TestTime(c *C) {
c.Assert(formatZeroTime(t.frac, t.dec), Equals, t.expected)
}
}

func (s *testTimeSuite) TestTimeStringLocation(c *C) {
t := fracTime{
time.Date(2018, time.Month(7), 30, 10, 0, 0, 0, time.FixedZone("EST", -5*3600)),
0,
nil,
}

c.Assert(t.String(), Equals, "2018-07-30 10:00:00")

t = fracTime{
time.Date(2018, time.Month(7), 30, 10, 0, 0, 0, time.FixedZone("EST", -5*3600)),
0,
time.UTC,
}
c.Assert(t.String(), Equals, "2018-07-30 15:00:00")
}

var _ = Suite(&testTimeSuite{})

0 comments on commit 58848a7

Please sign in to comment.