Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conflate operations to Stream #3401

Merged
merged 6 commits into from
Mar 8, 2024
Merged

Add conflate operations to Stream #3401

merged 6 commits into from
Mar 8, 2024

Conversation

mpilquist
Copy link
Member

@mpilquist mpilquist commented Feb 28, 2024

Similar to conflate/conflateWithSeed from akka-streams.

This PR adds:

  • conflateChunks
  • conflate
  • conflate1
  • conflateSemigroup
  • conflateMap

Recommended by @seigert on r/scala here: https://www.reddit.com/r/scala/comments/1ayqcx0/comment/krx6nr6/

@@ -568,6 +568,56 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Stream.eval(fstream)
}

def conflate[F2[x] >: F[x], O2 >: O](implicit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this signature. Feels very adhoc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking on Channel-based implementation that you suggested on Reddit and come up with some method signatures that I think would feel better:

  1. def conflateChunks(implicit F: Concurrent[F]): Stream[F, Chunk[O]] -- the base method for all others.
    As Channel.stream emits all collected data in a singular chunk, it feels natural to give a user
    access to them w/o any additional operations on elements.
    Also, I think it's worth doing even for current implementation, as pulling chunks is cheaper
    than pulling elements and chunk concat is O(1).
  2. def conflate[O2](zero: O2)(f: (O2, O) => O2)(implicit F: Concurrent[F]): Stream[F, O2] -- I think
    conflate series of methods should be similar in signatures to fold/scan.
  3. def conflate1(f: (O, O) => O)(implicit F: Concurrent[F]): Stream[F, O] - again, there are fold1/scan1
    and we just 'swapped' Akka's conflate/conflateWithSeed.
  4. def conflateSemigroup(implicit F: Concurrent[F], O: Semigroup[O]): Stream[F, O] -- once more
    we do like foldMonoid/scanMonoid except that we don't need empty;
  5. def conflateMap -- I'm not sure about that one, but why not?

On a side note, Cambridge Dictionary suggest 'combine, fuse, meld, merge' as synonims. 'Merge' is taken, 'fuse' and 'meld' are meh, but 'combine' is a-ok especially because we use Semigroup. On the other hand conflate would be more weel known for the people with Akka/Reactive Streams experience, much like switchMap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do something like conflateChunks, we're getting very close to the behavior of prefetchN -- which might be a sign that we should just offer a prefetchN variant that combines all chunks when dequeuing. Using conflateWithSeed as the base operation allows the conflation to drop / summarize data instead of storing each element. On the other hand, conflate suffers from loss of backpressure as-is -- there's no limit to how much is pulled from source stream, so if downstream isn'e expedient, we could run out of memory. prefetchN doesn't have this issue as it uses a bounded channel to transfer elements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, prefetchN already does the right thing - accumulates chunks that arrive while downstream is processing. So conflate is as simple as:

  def conflate[F2[x] >: F[x], O2 >: O](implicit
      F: Concurrent[F2],
      O: Semigroup[O2]
  ): Stream[F2, O2] =
    prefetchN(Int.MaxValue).chunks.map(_.combineAll)

Copy link
Member Author

@mpilquist mpilquist Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed some new implementations using your suggested signatures plus the addition of a chunkLimit param on each. Please take a look.

While channel does the right thing, prefetchN doesn't as it maintains the source chunk structure. The version of conflateChunks I just pushed uses a Channel directly and inlines the chunk conflation logic using unconsFlatMap (edit: removed the micro optimization).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it very much!

One nitpick -- I'm not sure about chunkLimit, maybe it's easier to use Channel.unbounded? We still have no control over number of elements in each conflated chunk, so it's not like very have any control over consumed memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to force folks to make a decision to use Int.MaxValue here instead of, by default, allowing unbounded memory usage. It's a bit of a speed bump to get folks to think about memory usage. Not sure chunkLimit is the right name given it's really a limit on number of chunks pulled from source stream and the max size of chunks emitted downstream.

def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](chunkLimit: Int)(f: O => O2): Stream[F2, O2] =
def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](chunkLimit: Int)(
f: O => O2
): Stream[F2, O2] =
map(f).conflateSemigroup[F2, O2](chunkLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move mapping function application after conflated chunks pull?

def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](chunkLimit: Int)(
    f: O => O2
): Stream[F2, O2] =
  conflateChunks[F2](chunkLimit).map { c => 
    c.drop(1).foldLeft(f(c(0)))((x, y) => Semigroup[O2].combine(x, f(y)))
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any rationale? No strong preference either way, just curious on the motivation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just that this way we apply f only when data was actually pulled. Thus, if downstream decides to stop/cancel, we didn't do any unnecessary transformations, consuming cpu and memory for allocations.

@@ -568,6 +568,56 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Stream.eval(fstream)
}

def conflate[F2[x] >: F[x], O2 >: O](implicit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it very much!

One nitpick -- I'm not sure about chunkLimit, maybe it's easier to use Channel.unbounded? We still have no control over number of elements in each conflated chunk, so it's not like very have any control over consumed memory.

@mpilquist mpilquist marked this pull request as ready for review February 29, 2024 21:18
@mpilquist mpilquist changed the title Add initial implementation of conflate and conflateWithSeed Add conflate operations to Stream Feb 29, 2024
@mpilquist mpilquist merged commit e0cdf07 into main Mar 8, 2024
31 checks passed
@mpilquist mpilquist deleted the topic/conflate branch March 8, 2024 12:48
@Jasper-M
Copy link
Contributor

Maybe also interesting to know that this operation was called bufferIntrospective in monix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants