8148250 Stream.limit parallel ordered performance

Tagir F. Valeev amaembo at gmail.com
Tue Jan 26 15:51:43 UTC 2016


Thank you for review! Here's the issue:
Will post complete webrev later.

PS> Note that it is still the case that in almost all scenarios this
PS> is likely to be a bad form of parallel stream.

Well not always it's possible to estimate in advance the size of the
stream. Consider that we have user-specified filter upstream which
searches over the big collection (we want to return first 10 search
results, order is important):

    .filter(element -> userQuery.test(element))

If user query produces 10-15 results, using parallel stream is very
reasonable, but if it produces millions of results it should not
regress very much (at least should not become much slower than
sequential version which is what we see currently).

PS> I think the comment you refer to still applies but now for larger n, so we should refine it.

Should we replace "regardless of the value of n" with "when
n*parallelismLevel is sufficiently large"?

With best regards,
Tagir Valeev.

PS> Paul.

>> On 24 Jan 2016, at 12:36, Tagir F. Valeev <amaembo at gmail.com> wrote:
>> Hello!
>> I'm investigating Stream.limit() performance for parallel ordered
>> streams when no SUBSIZED optimization is performed.
>> Consider the following code, for example:
>> AtomicInteger counter = new AtomicInteger();
>> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
>>        .peek(x -> counter.incrementAndGet()).limit(10).toArray();
>> System.out.println(Arrays.toString(result));
>> System.out.println(counter.get());
>> How much the counter.get() would print? It changes from launch to
>> launch, but usually within 375000..625000. This is just insane. On my
>> 4-core system parallel stream creates 16 individual tasks. I expect
>> that every individual task should consume no more than 10 elements, so
>> in total no more than 160 elements should be consumed in this case.
>> Here's a patch which addresses this issue:
>> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
>> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
>> to control the count of consumed elements and do not consume more than
>> necessary.
>> This change seems to fix the issue addressed in comment (at least
>> partially):
>> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
>> //     regardless of the value of n
>> //     Need to adjust the target size of splitting for the
>> //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
>> //     This will limit the size of the buffers created at the leaf nodes
>> //     cancellation will be more aggressive cancelling later tasks
>> //     if the target slice size has been reached from a given task,
>> //     cancellation should also clear local results if any
>> I checked with the following code:
>> for(int n : new int[] {10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000}) {
>>    System.out.println(n);
>>    long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> true).parallel().limit(n).toArray();
>>    long[] ref = LongStream.range(0, n).toArray();
>>    System.out.println(Arrays.equals(arr, ref));
>> }
>> It works correctly after applying my patch (while dies with OOME
>> without patch, as comment suggests).
>> Currently existing unit tests also pass with my patch. However I'm
>> very new in the internals of parallel stream processing, so it's
>> possible that I'm missing something. Please review! If this looks
>> reasonable I will log an issue and write new test cases.
>> Thank you in advance,
>> With best regards,
>> Tagir Valeev.

More information about the core-libs-dev mailing list