Skip to content

Commit

Permalink
place headers under meta
Browse files Browse the repository at this point in the history
Signed-off-by: clyang82 <[email protected]>
  • Loading branch information
clyang82 committed May 31, 2024
1 parent 74894f0 commit a4c8742
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 3 deletions.
3 changes: 2 additions & 1 deletion extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ async def receive_msg(
# Process headers
try:
headers = {header[0]: header[1].decode(encoding) for header in msg.headers}
event["headers"] = headers
event["meta"] = {}
event["meta"]["headers"] = headers
except UnicodeError:
logger.exception("Unicode error while decoding headers")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
encoding: ascii
rules:
- name: match kafka event
condition: event.headers.foo == "bar"
condition: event.meta.headers.foo == "bar"
action:
debug:
msg: "Rule fired successfully with headers"
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/event_source/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,8 @@ def test_receive_from_kafka_place_in_queue(myqueue):
},
)
)
assert myqueue.queue[0] == {"body": {"i": 0}, "headers": {"foo": "bar"}}
assert myqueue.queue[0] == {
"body": {"i": 0},
"meta": {"headers": {"foo": "bar"}},
}
assert len(myqueue.queue) == 2

0 comments on commit a4c8742

Please sign in to comment.