-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add kafka topic/partition/offset to the extension of event #896
Conversation
Signed-off-by: myan <[email protected]>
Can you please update the PR to include topic? nit: you could slightly optimize this existing call to avoid additional allocations if current headers is headers := make(map[string][]byte, 3+len(cm.Headers)) |
Signed-off-by: myan <[email protected]>
Thanks @embano1 ! The kafka topic has been added the in this pr and also optimization of the header initialization is done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm cc/ @duglin
@yanmxa can you look at the test failures? |
it might also be good to add some new ones |
Signed-off-by: myan <[email protected]>
Error: Received unexpected error:
kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes Probably not related to this PR, but worth taking a look? |
I'm going to kick off the tests again, but here's a link to the current test run for reference: https://github.com/cloudevents/sdk-go/actions/runs/4986878143/jobs/8928440318?pr=896 |
Still seeming failures due to the new fields: |
Signed-off-by: myan <[email protected]>
From the error message, it is caused by the Sarama binding integration test, and I add some code to fix the error. |
@embano1 @lionelvillard for another round of reviews. |
Thanks @duglin @embano1 @lionelvillard! PTAL~ |
eventIn.SetExtension(KAFKA_OFFSET, got.Extensions()[KAFKA_OFFSET]) | ||
eventIn.SetExtension(KAFKA_PARTITION, got.Extensions()[KAFKA_PARTITION]) | ||
eventIn.SetExtension(KAFKA_TOPIC, got.Extensions()[KAFKA_TOPIC]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked deeper at the different test.<>
functions, but was wondering what this is supposed to do here? You're setting extensions on the input event based on got
and then asserting. Is this correctly exercising and asserting the changes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another way we can remove the KAFKA_OFFSET
, KAFKA_PARTITION
, and KAFKA_TOPIC
from got
. Is this way better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, how do we remove extension attributes from a CE? I didn't see that option in here: https://github.com/cloudevents/sdk-go/blob/main/v2/event/event_writer.go
I'm ok with copying things from got
into eventIn
so that the Assert works (or removing them from got
if that's possible), but it seems to me that there should be a check of the values in got
to make sure they're not blank - even if we need to do those manually instead via a compare with eventIn
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@duglin Thanks for your advice! I have added the empty check for the event extensions.
Signed-off-by: myan <[email protected]>
extensions := got.Extensions() | ||
require.NotEmpty(t, extensions[KAFKA_OFFSET]) | ||
require.NotEmpty(t, extensions[KAFKA_PARTITION]) | ||
require.NotEmpty(t, extensions[KAFKA_TOPIC]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check their values? Should they match the constants defined above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! But we can only verify the topic defined above. And the partition/offset isn't defined here.
Signed-off-by: myan <[email protected]>
func TestSendEvent(t *testing.T) { | ||
test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) { | ||
eventIn = test.ConvertEventExtensionsToString(t, eventIn) | ||
clienttest.SendReceive(t, func() interface{} { | ||
return protocolFactory(t) | ||
}, eventIn, func(e event.Event) { | ||
test.AssertEventEquals(t, eventIn, test.ConvertEventExtensionsToString(t, e)) | ||
got := test.ConvertEventExtensionsToString(t, e) | ||
|
||
require.Equal(t, TopicName, got.Extensions()[KAFKA_TOPIC]) | ||
require.NotNil(t, got.Extensions()[KAFKA_PARTITION]) | ||
require.NotNil(t, got.Extensions()[KAFKA_OFFSET]) | ||
|
||
eventIn.SetExtension(KAFKA_OFFSET, got.Extensions()[KAFKA_OFFSET]) | ||
eventIn.SetExtension(KAFKA_PARTITION, got.Extensions()[KAFKA_PARTITION]) | ||
eventIn.SetExtension(KAFKA_TOPIC, TopicName) | ||
test.AssertEventEquals(t, eventIn, got) | ||
}) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting the following changes for more clarity of the test (also for the other integration test):
func TestSendEvent(t *testing.T) {
test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) {
eventIn = test.ConvertEventExtensionsToString(t, eventIn)
clienttest.SendReceive(t, func() interface{} {
return protocolFactory(t)
}, eventIn, func(eventOut event.Event) {
eventOut = test.ConvertEventExtensionsToString(t, eventOut)
require.Equal(t, TopicName, eventOut.Extensions()[KAFKA_TOPIC])
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])
test.AllOf(
test.HasExactlyAttributesEqualTo(eventIn.Context),
test.HasData(eventIn.Data()),
)
})
})
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the variable names for more clarity and dropped extension comparison since we're not setting specific extensions here and have require...
to assert on the specific extensions we want. Ideally we'd have a custom test.EventMatcher
to assert on extensions but skip specific ones which get added at a later stage. I'll create tracking issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
Thank @embano1!
Signed-off-by: myan <[email protected]>
@embano1 you good with the latest? |
LGTM, thank you! |
@yanmxa thanks for your patience. |
LGTM |
Signed-off-by: myan [email protected]
Refers: #846