Skip to content

Commit 9c46320

Browse files
Add support to deliver messages in batches
1 parent ab5421d commit 9c46320

File tree

5 files changed

+22
-0
lines changed

5 files changed

+22
-0
lines changed

lib/streamy/dispatcher.rb

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ def dispatch
88
Streamy.message_bus.deliver(**message_params)
99
end
1010

11+
def self.dispatch_many(events)
12+
Streamy.message_bus.deliver_many(events.map(&:to_message))
13+
end
14+
1115
private
1216

1317
attr_reader :event

lib/streamy/event.rb

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ def self.publish(**args)
1616
new(**args).publish
1717
end
1818

19+
def self.publish_many(events)
20+
Streamy.dispatcher.dispatch_many(events)
21+
end
22+
1923
priority :standard
2024

2125
def publish

lib/streamy/message_buses/kafka_message_bus.rb

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ def deliver(key:, topic:, payload:, priority:)
2323
end
2424
end
2525

26+
def deliver_many(messages)
27+
messages = messages.map { |message| message.except(:priority) }
28+
sync_producer.produce_many_sync(messages)
29+
end
30+
2631
def shutdown
2732
async_producer.close if async_producer?
2833
sync_producers.map(&:close)

lib/streamy/message_buses/message_bus.rb

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ class MessageBus
44
def deliver(key:, topic:, payload:, priority:)
55
# NOOP: Implement delivery logic
66
end
7+
8+
def deliver_many(messages)
9+
# NOOP: Implement delivery logic
10+
end
711
end
812
end
913
end

lib/streamy/test_dispatcher.rb

+5
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,10 @@ def dispatch
1212
events << event_params
1313
messages << message_params
1414
end
15+
16+
def self.dispatch_many(events)
17+
self.events += events.map(&:to_params)
18+
self.messages += events.map(&:to_message)
19+
end
1520
end
1621
end

0 commit comments

Comments
 (0)