D★Stream is a set of extensions for writing stream-processing code in Ruby.
CAUTION: D★Stream is work in progress, and pre-alpha quality.
The following example takes a sequence of events for a given ticket, and calculates the history for that ticket, using slowly changing dimensions:
events =
Enumerator.new do |y|
y << { id: 40562348, at: Time.now - 400, status: 'new' }
y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
y << { id: 40565795, at: Time.now - 250, priority: 'high' }
y << { id: 40569932, at: Time.now - 100, status: 'solved' }
end.lazy
S = DStream
history_builder =
S.compose(
# calculate new state
S.scan({}, &:merge),
# add `version`
S.zip(1..),
S.map { |(e, i)| e.merge(version: i) },
# remove `id`
S.map { |e| e.except(:id) } },
# add `valid_to` and `valid_from`, and remove `at`
S.with_next,
S.map { |(a, b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
S.map { |e| e.merge(valid_from: e.fetch(:at)) },
S.map { |e| e.except(:at) } },
# add `row_is_current`
S.with_next,
S.map { |(a, b)| a.merge(row_is_current: b.nil?) },
)
history = history_builder.call(events)
history.each { |e| p e }
The output is as follows:
{
:status=>"new",
:valid_from=>2017-05-05 20:18:14 +0200,
:valid_to=>2017-05-05 20:19:54 +0200,
:version=>1,
:row_is_current=>false
}
{
:status=>"new",
:assignee_id=>2,
:valid_from=>2017-05-05 20:19:54 +0200,
:valid_to=>2017-05-05 20:20:44 +0200,
:version=>2,
:row_is_current=>false
}
{
:status=>"new",
:assignee_id=>2,
:priority=>"high",
:valid_from=>2017-05-05 20:20:44 +0200,
:valid_to=>2017-05-05 20:23:14 +0200,
:version=>3,
:row_is_current=>false
}
{
:status=>"solved",
:assignee_id=>2,
:priority=>"high",
:valid_from=>2017-05-05 20:23:14 +0200,
:valid_to=>nil,
:version=>4,
:row_is_current=>true
}
This example is functionally identical to the one above, but uses S.compose
in order to make the final process, history_builder
, easier to understand.
events =
Enumerator.new do |y|
y << { id: 40562348, at: Time.now - 400, status: 'new' }
y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
y << { id: 40565795, at: Time.now - 250, priority: 'high' }
y << { id: 40569932, at: Time.now - 100, status: 'solved' }
end.lazy
S = DStream
merge =
S.scan({}, &:merge),
add_version =
S.compose(
S.zip(1..),
S.map { |(e,i)| e.merge(version: i) },
)
remove_id =
S.map { |e| e.except(:id) } }
add_valid_dates =
S.compose(
S.with_next,
S.map { |(a,b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
S.map { |e| e.merge(valid_from: e.fetch(:at)) },
S.map { |e| e.except(:at) } },
)
add_row_is_current =
S.compose(
S.with_next,
S.map { |(a,b)| a.merge(row_is_current: b.nil?) },
)
history_builder =
S.compose(
merge,
add_version,
remove_id,
add_valid_dates,
add_row_is_current,
)
history = history_builder.call(events)
history.each { |h| p h }
The following functions create individual processors:
-
map(&block)
(similar toEnumerable#map
)S.map(&:odd?).call(1..5).to_a # => [true, false, true, false, true]
-
select(&block)
(similar toEnumerable#select
)S.select(&:odd?).call(1..5).to_a # => [1, 3, 5]
-
reduce(&block)
(similar toEnumerable#reduce
)S.reduce(&:+).call(1..5) # => 15
-
take(n)
(similar toEnumerable#take
)S.take(3).call(1..10).to_a # => [1, 2, 3]
-
zip(other)
(similar toEnumerable#zip
):S.zip((10..13)).call(1..3).to_a # => [[1, 10], [2, 11], [3, 12]]
-
buffer(size)
yields each stream element, but keeps an internal buffer of not-yet-yielded stream elements. This is useful when reading from a slow and bursty data source, such as a paginated HTTP API. -
with_next
yields an array containing the stream element and the next stream element, or nil when the end of the stream is reached:S.with_next.call(1..5).to_a # => [[1, 2], [2, 3], [3, 4], [4, 5], [5, nil]]
-
scan(init, &block)
is similar toreduce
, but rather than returning a single aggregated value, returns all intermediate aggregated values:S.scan(0, &:+).call(1..5).to_a # => [1, 3, 6, 10, 15]
-
flatten2
yields the stream element if it is not an array, otherwise yields the stream element array’s contents:S.compose(S.with_next, S.flatten2).call(1..5).to_a # => [1, 2, 2, 3, 3, 4, 4, 5, 5, nil]
To call a processor to a stream, use #call
:
S = DStream
stream = ['hi', 'hello']
S.map(&:upcase).call(stream).to_a
# => ["HI", "HELLO"]
To combine one or more processors, use .compose
:
S = DStream
stream = ['hi', 'hello']
processor = S.compose(
S.map(&:upcase),
S.map(&:reverse),
)
processor.call(stream).to_a
# => ["IH", "OLLEH"]