Skip to content

Commit b4388e4

Browse files
author
Peiyu Wu
committed
Add bootstarpServers and headers to Kafka event
1 parent 99b35f2 commit b4388e4

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

events/kafka.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@
33
package events
44

55
type KafkaEvent struct {
6-
EventSource string `json:"eventSource"`
7-
EventSourceARN string `json:"eventSourceArn"`
8-
Records map[string][]KafkaRecord `json:"records"`
6+
EventSource string `json:"eventSource"`
7+
EventSourceARN string `json:"eventSourceArn"`
8+
Records map[string][]KafkaRecord `json:"records"`
9+
BootstrapSevers string `json:"bootstrapSevers"`
910
}
1011

1112
type KafkaRecord struct {
12-
Topic string `json:"topic"`
13-
Partition int64 `json:"partition"`
14-
Offset int64 `json:"offset"`
15-
Timestamp MilliSecondsEpochTime `json:"timestamp"`
16-
TimestampType string `json:"timestampType"`
17-
Key string `json:"key,omitempty"`
18-
Value string `json:"value,omitempty"`
13+
Topic string `json:"topic"`
14+
Partition int64 `json:"partition"`
15+
Offset int64 `json:"offset"`
16+
Timestamp MilliSecondsEpochTime `json:"timestamp"`
17+
TimestampType string `json:"timestampType"`
18+
Key string `json:"key,omitempty"`
19+
Value string `json:"value,omitempty"`
20+
Headers []map[string][]byte `json:"headers"`
1921
}

events/kafka_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,22 @@ func TestKafkaEventMarshaling(t *testing.T) {
2020
t.Errorf("could not unmarshal event. details: %v", err)
2121
}
2222

23+
assert.Equal(t, inputEvent.BootstrapSevers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
24+
assert.Equal(t, inputEvent.EventSource, "aws:kafka")
25+
assert.Equal(t, EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4")
2326
for _, records := range inputEvent.Records {
2427
for _, record := range records {
2528
utc := record.Timestamp.UTC()
2629
assert.Equal(t, 2020, utc.Year())
30+
assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")
31+
assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")
32+
for header := range record.Headers {
33+
for key, value := range header {
34+
assert.Equal(t, key, "headerKey")
35+
var headerValue String := string(value)
36+
assert.Equal(t, headerValue, "headerValue")
37+
}
38+
}
2739
}
2840
}
2941

events/testdata/kafka-event.json

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"eventSource": "aws:kafka",
33
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
4+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
45
"records": {
56
"AWSKafkaTopic-0": [
67
{
@@ -10,7 +11,24 @@
1011
"timestamp": 1595035749700,
1112
"timestampType": "CREATE_TIME",
1213
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
13-
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
14+
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
15+
"headers": [
16+
{
17+
"headerKey": [
18+
104,
19+
101,
20+
97,
21+
100,
22+
101,
23+
114,
24+
86,
25+
97,
26+
108,
27+
117,
28+
101
29+
]
30+
}
31+
]
1432
}
1533
]
1634
}

0 commit comments

Comments
 (0)