Skip to content

Commit ed80c61

Browse files
committed
Make timestamp fields of type time.Time and time.Duration
1 parent 5959a18 commit ed80c61

File tree

6 files changed

+106
-60
lines changed

6 files changed

+106
-60
lines changed

consumer.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -532,20 +532,14 @@ func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*Cons
532532
}
533533
prelude = false
534534

535-
millis := batch.FirstTimestamp + rec.TimestampDelta
536-
timestamp := time.Time{}
537-
if millis >= 0 {
538-
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
539-
}
540-
541535
if offset >= child.offset {
542536
messages = append(messages, &ConsumerMessage{
543537
Topic: child.topic,
544538
Partition: child.partition,
545539
Key: rec.Key,
546540
Value: rec.Value,
547541
Offset: offset,
548-
Timestamp: timestamp,
542+
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
549543
Headers: rec.Headers,
550544
})
551545
child.offset = offset + 1

message.go

+3-19
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,9 @@ func (m *Message) encode(pe packetEncoder) error {
4545
pe.putInt8(attributes)
4646

4747
if m.Version >= 1 {
48-
timestamp := int64(-1)
49-
50-
if !m.Timestamp.Before(time.Unix(0, 0)) {
51-
timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond)
52-
} else if !m.Timestamp.IsZero() {
53-
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)}
48+
if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
49+
return err
5450
}
55-
56-
pe.putInt64(timestamp)
5751
}
5852

5953
err := pe.putBytes(m.Key)
@@ -133,19 +127,9 @@ func (m *Message) decode(pd packetDecoder) (err error) {
133127
m.Codec = CompressionCodec(attribute & compressionCodecMask)
134128

135129
if m.Version == 1 {
136-
millis, err := pd.getInt64()
137-
if err != nil {
130+
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
138131
return err
139132
}
140-
141-
// negative timestamps are invalid, in these cases we should return
142-
// a zero time
143-
timestamp := time.Time{}
144-
if millis >= 0 {
145-
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
146-
}
147-
148-
m.Timestamp = timestamp
149133
}
150134

151135
m.Key, err = pd.getBytes()

record.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package sarama
22

3+
import "time"
4+
35
const (
46
controlMask = 0x20
57
)
@@ -29,7 +31,7 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) {
2931

3032
type Record struct {
3133
Attributes int8
32-
TimestampDelta int64
34+
TimestampDelta time.Duration
3335
OffsetDelta int64
3436
Key []byte
3537
Value []byte
@@ -41,7 +43,7 @@ type Record struct {
4143
func (r *Record) encode(pe packetEncoder) error {
4244
pe.push(&r.length)
4345
pe.putInt8(r.Attributes)
44-
pe.putVarint(r.TimestampDelta)
46+
pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
4547
pe.putVarint(r.OffsetDelta)
4648
if err := pe.putVarintBytes(r.Key); err != nil {
4749
return err
@@ -69,9 +71,11 @@ func (r *Record) decode(pd packetDecoder) (err error) {
6971
return err
7072
}
7173

72-
if r.TimestampDelta, err = pd.getVarint(); err != nil {
74+
timestamp, err := pd.getVarint()
75+
if err != nil {
7376
return err
7477
}
78+
r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
7579

7680
if r.OffsetDelta, err = pd.getVarint(); err != nil {
7781
return err

record_batch.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"compress/gzip"
66
"fmt"
77
"io/ioutil"
8+
"time"
89

910
"github.com/eapache/go-xerial-snappy"
1011
"github.com/pierrec/lz4"
@@ -41,8 +42,8 @@ type RecordBatch struct {
4142
Codec CompressionCodec
4243
Control bool
4344
LastOffsetDelta int32
44-
FirstTimestamp int64
45-
MaxTimestamp int64
45+
FirstTimestamp time.Time
46+
MaxTimestamp time.Time
4647
ProducerID int64
4748
ProducerEpoch int16
4849
FirstSequence int32
@@ -64,8 +65,15 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
6465
pe.push(newCRC32Field(crcCastagnoli))
6566
pe.putInt16(b.computeAttributes())
6667
pe.putInt32(b.LastOffsetDelta)
67-
pe.putInt64(b.FirstTimestamp)
68-
pe.putInt64(b.MaxTimestamp)
68+
69+
if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
70+
return err
71+
}
72+
73+
if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
74+
return err
75+
}
76+
6977
pe.putInt64(b.ProducerID)
7078
pe.putInt16(b.ProducerEpoch)
7179
pe.putInt32(b.FirstSequence)
@@ -122,11 +130,11 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
122130
return err
123131
}
124132

125-
if b.FirstTimestamp, err = pd.getInt64(); err != nil {
133+
if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
126134
return err
127135
}
128136

129-
if b.MaxTimestamp, err = pd.getInt64(); err != nil {
137+
if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
130138
return err
131139
}
132140

record_test.go

+41-25
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strconv"
77
"strings"
88
"testing"
9+
"time"
910

1011
"github.com/davecgh/go-spew/spew"
1112
)
@@ -17,8 +18,13 @@ var recordBatchTestCases = []struct {
1718
oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
1819
}{
1920
{
20-
name: "empty record",
21-
batch: RecordBatch{Version: 2, Records: []*Record{}},
21+
name: "empty record",
22+
batch: RecordBatch{
23+
Version: 2,
24+
FirstTimestamp: time.Unix(0, 0),
25+
MaxTimestamp: time.Unix(0, 0),
26+
Records: []*Record{},
27+
},
2228
encoded: []byte{
2329
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
2430
0, 0, 0, 49, // Length
@@ -36,8 +42,14 @@ var recordBatchTestCases = []struct {
3642
},
3743
},
3844
{
39-
name: "control batch",
40-
batch: RecordBatch{Version: 2, Control: true, Records: []*Record{}},
45+
name: "control batch",
46+
batch: RecordBatch{
47+
Version: 2,
48+
Control: true,
49+
FirstTimestamp: time.Unix(0, 0),
50+
MaxTimestamp: time.Unix(0, 0),
51+
Records: []*Record{},
52+
},
4153
encoded: []byte{
4254
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
4355
0, 0, 0, 49, // Length
@@ -58,9 +70,10 @@ var recordBatchTestCases = []struct {
5870
name: "uncompressed record",
5971
batch: RecordBatch{
6072
Version: 2,
61-
FirstTimestamp: 10,
73+
FirstTimestamp: time.Unix(1479847795, 0),
74+
MaxTimestamp: time.Unix(0, 0),
6275
Records: []*Record{{
63-
TimestampDelta: 5,
76+
TimestampDelta: 5 * time.Millisecond,
6477
Key: []byte{1, 2, 3, 4},
6578
Value: []byte{5, 6, 7},
6679
Headers: []*RecordHeader{{
@@ -74,10 +87,10 @@ var recordBatchTestCases = []struct {
7487
0, 0, 0, 70, // Length
7588
0, 0, 0, 0, // Partition Leader Epoch
7689
2, // Version
77-
219, 71, 20, 201, // CRC
90+
84, 121, 97, 253, // CRC
7891
0, 0, // Attributes
7992
0, 0, 0, 0, // Last Offset Delta
80-
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
93+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
8194
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
8295
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
8396
0, 0, // Producer Epoch
@@ -103,9 +116,10 @@ var recordBatchTestCases = []struct {
103116
batch: RecordBatch{
104117
Version: 2,
105118
Codec: CompressionGZIP,
106-
FirstTimestamp: 10,
119+
FirstTimestamp: time.Unix(1479847795, 0),
120+
MaxTimestamp: time.Unix(0, 0),
107121
Records: []*Record{{
108-
TimestampDelta: 5,
122+
TimestampDelta: 5 * time.Millisecond,
109123
Key: []byte{1, 2, 3, 4},
110124
Value: []byte{5, 6, 7},
111125
Headers: []*RecordHeader{{
@@ -118,11 +132,11 @@ var recordBatchTestCases = []struct {
118132
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
119133
0, 0, 0, 94, // Length
120134
0, 0, 0, 0, // Partition Leader Epoch
121-
2, // Version
122-
15, 156, 184, 78, // CRC
135+
2, // Version
136+
159, 236, 182, 189, // CRC
123137
0, 1, // Attributes
124138
0, 0, 0, 0, // Last Offset Delta
125-
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
139+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
126140
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
127141
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
128142
0, 0, // Producer Epoch
@@ -136,10 +150,10 @@ var recordBatchTestCases = []struct {
136150
0, 0, 0, 94, // Length
137151
0, 0, 0, 0, // Partition Leader Epoch
138152
2, // Version
139-
144, 168, 0, 33, // CRC
153+
0, 216, 14, 210, // CRC
140154
0, 1, // Attributes
141155
0, 0, 0, 0, // Last Offset Delta
142-
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
156+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
143157
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
144158
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
145159
0, 0, // Producer Epoch
@@ -154,9 +168,10 @@ var recordBatchTestCases = []struct {
154168
batch: RecordBatch{
155169
Version: 2,
156170
Codec: CompressionSnappy,
157-
FirstTimestamp: 10,
171+
FirstTimestamp: time.Unix(1479847795, 0),
172+
MaxTimestamp: time.Unix(0, 0),
158173
Records: []*Record{{
159-
TimestampDelta: 5,
174+
TimestampDelta: 5 * time.Millisecond,
160175
Key: []byte{1, 2, 3, 4},
161176
Value: []byte{5, 6, 7},
162177
Headers: []*RecordHeader{{
@@ -169,11 +184,11 @@ var recordBatchTestCases = []struct {
169184
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
170185
0, 0, 0, 72, // Length
171186
0, 0, 0, 0, // Partition Leader Epoch
172-
2, // Version
173-
95, 173, 35, 17, // CRC
187+
2, // Version
188+
21, 0, 159, 97, // CRC
174189
0, 2, // Attributes
175190
0, 0, 0, 0, // Last Offset Delta
176-
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
191+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
177192
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
178193
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
179194
0, 0, // Producer Epoch
@@ -187,9 +202,10 @@ var recordBatchTestCases = []struct {
187202
batch: RecordBatch{
188203
Version: 2,
189204
Codec: CompressionLZ4,
190-
FirstTimestamp: 10,
205+
FirstTimestamp: time.Unix(1479847795, 0),
206+
MaxTimestamp: time.Unix(0, 0),
191207
Records: []*Record{{
192-
TimestampDelta: 5,
208+
TimestampDelta: 5 * time.Millisecond,
193209
Key: []byte{1, 2, 3, 4},
194210
Value: []byte{5, 6, 7},
195211
Headers: []*RecordHeader{{
@@ -202,11 +218,11 @@ var recordBatchTestCases = []struct {
202218
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
203219
0, 0, 0, 89, // Length
204220
0, 0, 0, 0, // Partition Leader Epoch
205-
2, // Version
206-
129, 238, 43, 82, // CRC
221+
2, // Version
222+
169, 74, 119, 197, // CRC
207223
0, 3, // Attributes
208224
0, 0, 0, 0, // Last Offset Delta
209-
0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
225+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
210226
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
211227
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
212228
0, 0, // Producer Epoch

timestamp.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package sarama
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
type Timestamp struct {
9+
*time.Time
10+
}
11+
12+
func (t Timestamp) encode(pe packetEncoder) error {
13+
timestamp := int64(-1)
14+
15+
if !t.Before(time.Unix(0, 0)) {
16+
timestamp = t.UnixNano() / int64(time.Millisecond)
17+
} else if !t.IsZero() {
18+
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", t)}
19+
}
20+
21+
pe.putInt64(timestamp)
22+
return nil
23+
}
24+
25+
func (t Timestamp) decode(pd packetDecoder) error {
26+
millis, err := pd.getInt64()
27+
if err != nil {
28+
return err
29+
}
30+
31+
// negative timestamps are invalid, in these cases we should return
32+
// a zero time
33+
timestamp := time.Time{}
34+
if millis >= 0 {
35+
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
36+
}
37+
38+
*t.Time = timestamp
39+
return nil
40+
}

0 commit comments

Comments
 (0)