Skip to content

Commit fcee23b

Browse files
Add support to deliver messages in batches
1 parent ab5421d commit fcee23b

File tree

4 files changed

+16
-0
lines changed

4 files changed

+16
-0
lines changed

lib/streamy/dispatcher.rb

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ def dispatch
88
Streamy.message_bus.deliver(**message_params)
99
end
1010

11+
def self.dispatch_many(events)
12+
Streamy.message_bus.deliver_many(events.map(&:to_message))
13+
end
14+
1115
private
1216

1317
attr_reader :event

lib/streamy/event.rb

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ def self.publish(**args)
1616
new(**args).publish
1717
end
1818

19+
def self.publish_many(events)
20+
Streamy.dispatcher.dispatch_many(events)
21+
end
22+
1923
priority :standard
2024

2125
def publish

lib/streamy/message_buses/kafka_message_bus.rb

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def deliver(key:, topic:, payload:, priority:)
2323
end
2424
end
2525

26+
def deliver_many(messages)
27+
sync_producer.produce_many_sync(messages.except(:priority))
28+
end
29+
2630
def shutdown
2731
async_producer.close if async_producer?
2832
sync_producers.map(&:close)

lib/streamy/message_buses/message_bus.rb

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ class MessageBus
44
def deliver(key:, topic:, payload:, priority:)
55
# NOOP: Implement delivery logic
66
end
7+
8+
def deliver_many(messages)
9+
# NOOP: Implement delivery logic
10+
end
711
end
812
end
913
end

0 commit comments

Comments
 (0)