Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent behavior when parallel=1 #461

Closed
cykl opened this issue Feb 27, 2020 · 9 comments
Closed

Inconsistent behavior when parallel=1 #461

cykl opened this issue Feb 27, 2020 · 9 comments
Assignees
Labels
bug Something isn't working

Comments

@cykl
Copy link

cykl commented Feb 27, 2020

To parallelize some side-effects I wrote following code which works as expected on my local machine:

  public static void main( String[ ] args ) {
        var executor = Executors.newFixedThreadPool(10 );
        try {
            var list = List.of( "A", "B" );

            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor) )
                    .join( );
        } finally {
            executor.shutdown( );
        }
    }

    private static String processRecord( String record ) {
        System.out.println( Thread.currentThread( ).getName( ) );
        return record;
    }
$ docker run --rm testcase
pool-1-thread-2
pool-1-thread-1

However it took me a while to figure out why it doesn't when deployed on our cluster (Mesos & K8S). Eventually, I figured out that since release 2.3.0, parallel-collectors has an inconstent behavior when parallel=1 and parallel>1.

$ docker run --rm -cpuset-cpus=0 testcase
$ docker run --rm -cpuset-cpus=0,1 testcase
$ docker run --rm -cpuset-cpus=0,1,2 testcase
pool-1-thread-2
pool-1-thread-1

When parallel=1, mapping function is only invoked when the stream returned by parallel().join() is consumed by a terminal operation (some like count won't as the mapping function can be optimized away).

  public static void main( String[ ] args ) {
        var executor = Executors.newFixedThreadPool( 10 );
        try {
            var list = List.of( "A", "B" );

            System.out.println( "count" );
            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
                    .join( )
                    .count( );

            System.out.println( "toList" );
            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
                    .join( )
                    .collect( Collectors.toList( ) );
        } finally {
            executor.shutdown( );
        }
    }
$ docker run --cpuset-cpus=0  --rm testcase
count
toList
main
main

When parallel>1, mapping is always performed in provider executor in an eager way (no need to consume the stream).

This seems a bug as Javadoc says that parallel computation will be performed on a custom executor which is not true in this case. It is also dangerous as it is seems too easy to write the same bogus code than me.

I would propose to revert to 2.2.0 behaviour.

@cykl cykl added the bug Something isn't working label Feb 27, 2020
@pivovarit
Copy link
Owner

Huh, great point. Will fix ASAP

@cykl
Copy link
Author

cykl commented Feb 27, 2020

Parallel=1 behaviour, comes from following code which was added in 2.3.0:

    static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) {
        return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper)), executor));
    }

Future is completed in executor but finisher isn't. finished will be executed on consuming thread.

@pivovarit
Copy link
Owner

Yeah, I know. I even wrote an article about this behaviour of CompletableFuture 🤦‍♂ - I tried to outsmart the tool by providing a simplified "parallel" collectors for parallelism == 1, but forgot about such edge cases...

@pivovarit
Copy link
Owner

Initial fix is here: #462

I will see if I can come up with something better than a revert, though

@pivovarit
Copy link
Owner

An ultimate fix might be cumbersome a bit but it's still way lighter than reverting to the full-blown parallel collector infrastructure:

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) {
        return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper).collect(toList()).stream()), executor));
    }

@pivovarit
Copy link
Owner

pivovarit commented Feb 27, 2020

Released in https://github.com/pivovarit/parallel-collectors/releases/tag/2.3.2 (give it some time to propagate to Maven Central, usually around 12-24h)

Thanks for the investigation and making the tool better👏

@tporadowski
Copy link

I have just stumbled upon this issue (running the same code on my Windows machine vs some small AWS instance running on Linux) and could not initially figure out what was going on as nothing was executed on my ExecutorService. I managed to get it running by adding additional toList() collector in call to ParallelCollectors.parallel(), but that was not needed when run on my machine. Then I found this issue and checked /proc/cpuinfo on that remote machine to find out it had only 2 CPUs :). Anyway, 2.3.2 fixes the issue, thanks!

@pivovarit
Copy link
Owner

Keep in mind that there's one more overload where you can provide the parallelism of your choice.

Now when I think about it, I think it was a mistake to try to be smart and have parallelism coupled to the Runtime#getNumberOfProcessors(). I will consider dropping that method overload for 3.0.0.

Anyway, I'm glad those issues pop up... it means someone is using the lib :) thanks!

@cykl
Copy link
Author

cykl commented Mar 9, 2020

Now when I think about it, I think it was a mistake to try to be smart and have parallelism coupled to the Runtime#getNumberOfProcessors(). I will consider dropping that method overload for 3.0.0.

I would vote for the removal. Most use cases seems to be related with blocking IO parallelization and its easy to set the right parallelism if you are trying to parallelize CPU bound tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants