Grouping stream elements by their position - how to handle tail of stream ?

Lattie latsama at
Sat Feb 2 16:49:53 PST 2013

>Now, that said, here's a parallel algorithm you can implement with
>collect(), though you may find the performance overhead too offensive.
>Basically, for each chunk of the input, you compute two possible
>p0 = (t1,t2), (t3,t4), ..., maybe leftover tn

This is what I want... the ability to compute p0.  I was not able to
figure out how to use collect() to do this though, and so I went down
the road of explode() with a stateful lambda... can someone point me
to any tutorial or info on how to properly use collect()?


On Sat, Feb 2, 2013 at 1:55 PM, Brian Goetz <brian.goetz at> wrote:
> The glass is half full, or half empty, depending on your disposition.
> While its trivial to write a version of explode() that remembers the
> last element seen, and either emits nothing or a pair (you have a little
> fixup at the end to deal with, but that's easy too, just annoying), once
> you start writing such stateful lambdas, you are tying yourself to
> sequential execution in a very error-prone way.  The type system can't
> record this assumption, so when someone helpfully later adds a
> .parallel() somewhere, your code will silently turn to sewage.  So,
> don't ever do this.
> Too see why the obvious algorithm is sequential only, consider a
> decomposition of a data source with a spliterator.  In most cases, we
> don't necessarily know the even-ness or odd-ness of the sum of sizes of
> all prior splits.  Which means we don't know whether the first element
> of any given split is the left element of some pair or the right element
> of some pair.
> You might say: I don't care about parallelism, I only care about
> sequential.
> To which we'd respond: fine, but we're not really interested in
> distorting the model to encourage that.  The Stream API is designed
> around operations that can be equally well performed in serial or
> parallel.  There are no "serial only" operations supported, and that was
> an explicit design choice.  Passing state-holding lambdas to these
> methods is likely to be a spec violation.  We can't enforce it, any more
> than we can enforce the lack of data races, but bear in mind that if you
> do this, you're writing broken code, and asking for trouble.
> Now, that said, here's a parallel algorithm you can implement with
> collect(), though you may find the performance overhead too offensive.
> Basically, for each chunk of the input, you compute two possible
> answers:
> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
> then you combine these pairs chunks in the mostly obvious way (observe
> that p0 has trailing element = !(p1 has trailing element)).  Then when
> you get to the top of the tree, you take the one that doesn't have an
> orphaned initial element, and toss the other.  Basically you're doing 2x
> work for each input element, but it parallelizes fairly cleanly.
> Such an algorithm is ideally suited to collect(), because a Collector is
> a representation of a catamorphism, which the above transformation is.
> Because the combination logic is associative, it parallelizes cleanly
> without needing any nasty stateful lambdas.
> When we expose the Op APIs and you can write your own
> decomposition-aware operations, you'll have another option: write a
> StatefulOp.  This won't be an option for 8, but once we do, its easy
> enough, and it will let you avoid the extra overhead by creating a
> custom operation.
> So, to sum up:
>   - If you're looking for a one-liner, you'll be disappointed;
>   - If you convince yourself "I would never need parallelism here" and
> write the obvious unsafe stateful lambda, please write your manager a
> letter first explaining how your incompetence is putting the reliability
> of his product in jeopardy;
>   - If you're willing to write the above collector, you'll likely be
> happy enough with the result.
> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>> Hi.
>> I looking in Stream interface for a way to convert a stream of
>>     {  t1, t2, .. Tn }
>> to
>>     { {t1,t2}, ... {Tn-1, Tn} }
>> or
>>     { {t1,t2}, ... {Tn, null}}
>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>> So I tried to use explode:
>> *        Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>          Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>>              Integer firstValue;
>>              @Override
>>              public void accept(Stream.Downstream<Pair<Integer, Integer>>
>> pairDownstream, Integer v) {
>>                  if (firstValue == null) {
>>                      firstValue = v;
>>                  } else {
>>                      pairDownstream.send(new Pair<>(firstValue, v));
>>                      firstValue = null;
>>                  }
>>              }
>>          });
>> *
>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>> stream { ,,, {5, null}}.
>> Of-course this only simplified example of more general problem I have.
>> Thanks
>> Boaz

More information about the lambda-dev mailing list