Grouping stream elements by their position - how to handle tail of stream ?

Boaz Nahum boaznahum at
Sat Feb 2 23:38:04 PST 2013

Hi Brian.

> The glass is half full, or half empty, depending on your disposition.
I know my English is quite poor. My question was a real - not a criticism.
Juts trying to learn.

I gave up paralism - that why I called sequential before explode

>>   si.sequential().explode

> you have a little fixup at the end to deal with, but that's easy too,
just annoying
I didn't find a way to to it with explode (unless my explode is last in the
chain, then it easy). As some else mention in case of sequential stream a
'finalize' method will do the trick.

As you suggested, I will look at Collectors.


On Sat, Feb 2, 2013 at 11:55 PM, Brian Goetz <brian.goetz at> wrote:

> The glass is half full, or half empty, depending on your disposition.
> While its trivial to write a version of explode() that remembers the last
> element seen, and either emits nothing or a pair (you have a little fixup
> at the end to deal with, but that's easy too, just annoying), once you
> start writing such stateful lambdas, you are tying yourself to sequential
> execution in a very error-prone way.  The type system can't record this
> assumption, so when someone helpfully later adds a .parallel() somewhere,
> your code will silently turn to sewage.  So, don't ever do this.
> Too see why the obvious algorithm is sequential only, consider a
> decomposition of a data source with a spliterator.  In most cases, we don't
> necessarily know the even-ness or odd-ness of the sum of sizes of all prior
> splits.  Which means we don't know whether the first element of any given
> split is the left element of some pair or the right element of some pair.
> You might say: I don't care about parallelism, I only care about
> sequential.
> To which we'd respond: fine, but we're not really interested in distorting
> the model to encourage that.  The Stream API is designed around operations
> that can be equally well performed in serial or parallel.  There are no
> "serial only" operations supported, and that was an explicit design choice.
>  Passing state-holding lambdas to these methods is likely to be a spec
> violation.  We can't enforce it, any more than we can enforce the lack of
> data races, but bear in mind that if you do this, you're writing broken
> code, and asking for trouble.
> Now, that said, here's a parallel algorithm you can implement with
> collect(), though you may find the performance overhead too offensive.
> Basically, for each chunk of the input, you compute two possible
> answers:
> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
> then you combine these pairs chunks in the mostly obvious way (observe
> that p0 has trailing element = !(p1 has trailing element)).  Then when you
> get to the top of the tree, you take the one that doesn't have an orphaned
> initial element, and toss the other.  Basically you're doing 2x work for
> each input element, but it parallelizes fairly cleanly.
> Such an algorithm is ideally suited to collect(), because a Collector is a
> representation of a catamorphism, which the above transformation is.
> Because the combination logic is associative, it parallelizes cleanly
> without needing any nasty stateful lambdas.
> When we expose the Op APIs and you can write your own decomposition-aware
> operations, you'll have another option: write a StatefulOp.  This won't be
> an option for 8, but once we do, its easy enough, and it will let you avoid
> the extra overhead by creating a custom operation.
> So, to sum up:
>  - If you're looking for a one-liner, you'll be disappointed;
>  - If you convince yourself "I would never need parallelism here" and
> write the obvious unsafe stateful lambda, please write your manager a
> letter first explaining how your incompetence is putting the reliability of
> his product in jeopardy;
>  - If you're willing to write the above collector, you'll likely be happy
> enough with the result.
> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>> Hi.
>> I looking in Stream interface for a way to convert a stream of
>>     {  t1, t2, .. Tn }
>> to
>>     { {t1,t2}, ... {Tn-1, Tn} }
>> or
>>     { {t1,t2}, ... {Tn, null}}
>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>> So I tried to use explode:
>> *        Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>          Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>> BiConsumer<Stream.Downstream<**Pair<Integer, Integer>>, Integer>() {
>>              Integer firstValue;
>>              @Override
>>              public void accept(Stream.Downstream<Pair<**Integer,
>> Integer>>
>> pairDownstream, Integer v) {
>>                  if (firstValue == null) {
>>                      firstValue = v;
>>                  } else {
>>                      pairDownstream.send(new Pair<>(firstValue, v));
>>                      firstValue = null;
>>                  }
>>              }
>>          });
>> *
>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>> stream { ,,, {5, null}}.
>> Of-course this only simplified example of more general problem I have.
>> Thanks
>> Boaz

More information about the lambda-dev mailing list