Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions events/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
package events

type KafkaEvent struct {
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
Records map[string][]KafkaRecord `json:"records"`
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
Records map[string][]KafkaRecord `json:"records"`
BootstrapServers string `json:"bootstrapServers"`
}

type KafkaRecord struct {
Expand All @@ -16,4 +17,5 @@ type KafkaRecord struct {
TimestampType string `json:"timestampType"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Headers []map[string][]byte `json:"headers"`
}
22 changes: 13 additions & 9 deletions events/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@ func TestKafkaEventMarshaling(t *testing.T) {
t.Errorf("could not unmarshal event. details: %v", err)
}

assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
assert.Equal(t, inputEvent.EventSource, "aws:kafka")
assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4")
for _, records := range inputEvent.Records {
for _, record := range records {
utc := record.Timestamp.UTC()
assert.Equal(t, 2020, utc.Year())
assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")
assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")

for _, header := range record.Headers {
for key, value := range header {
assert.Equal(t, key, "headerKey")
var headerValue string = string(value)
assert.Equal(t, headerValue, "headerValue")
}
}
}
}

// 3. serialize to JSON
outputJson, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJson), string(outputJson))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kafka header is a list of map between string and []byte.
According to Marshal JSON

Array and slice values encode as JSON arrays, except that []byte encodes as a base64-encoded string, and a nil slice encodes as the null JSON value.

This assertion won't be true anymore.

}

func TestKafkaMarshalingMalformedJson(t *testing.T) {
Expand Down
20 changes: 19 additions & 1 deletion events/testdata/kafka-event.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"AWSKafkaTopic-0": [
{
Expand All @@ -10,7 +11,24 @@
"timestamp": 1595035749700,
"timestampType": "CREATE_TIME",
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"headers": [
{
"headerKey": [
104,
101,
97,
100,
101,
114,
86,
97,
108,
117,
101
]
}
]
}
]
}
Expand Down