Grouping stream elements by their position - how to handle tail of stream ?

Howard Lovatt howard.lovatt at
Mon Feb 4 15:27:24 PST 2013

Hi Brian,

This is an enquiry related to grouping or parsing an input stream, but
a little different.

In numerical programs common operations are matrix: transpose, other
reshaping, or 'sparsification'. I have seen these implemented on
parallel architectures as an indirection of indexes, something like:


Has thought been given to such a capability for the stream library?


 -- Howard.

Sent from my iPad

On 03/02/2013, at 3:04 PM, Brian Goetz <brian.goetz at> wrote:

> Look at the implementations in Collectors for a hint.  Obviously we owe
> some significant documentation, which unfortunately lags the code by too
> much.  But a Collector is basically a mutable foldl/inject; you provide
> a way to create a new mutable container, a way to add a single element
> to it, and a way to combine two mutable containers.  The framework
> decomposes the source, creates a bunch of little containers at the
> leaves, and them goes up the tree, merging.
> Dumping the elements into an ArrayList maps easily to Collector:
>  collect(ArrayList::new, ArrayList::add, ArrayList::addAll)
> This parallelizes, as we can collect each chunk into a list, and then
> merge the lists up the tree with addAll.
> The "how do I get the last element" problem goes away, since your data
> structure can explicitly maintain the state of whether the chunk ends
> with a balanced pair or an extra.
> But, you need to calculate BOTH p0 and p1 for each chunk (except maybe
> the left-spine chunks), because you don't know until the end which will
> have the right parity.
> On 2/2/2013 7:49 PM, Lattie wrote:
>>> Now, that said, here's a parallel algorithm you can implement with
>>> collect(), though you may find the performance overhead too offensive.
>>> Basically, for each chunk of the input, you compute two possible
>>> answers:
>>> p0 = (t1,t2), (t3,t4), ..., maybe leftover tn
>> This is what I want... the ability to compute p0.  I was not able to
>> figure out how to use collect() to do this though, and so I went down
>> the road of explode() with a stateful lambda... can someone point me
>> to any tutorial or info on how to properly use collect()?
>> TIA
>> On Sat, Feb 2, 2013 at 1:55 PM, Brian Goetz <brian.goetz at> wrote:
>>> The glass is half full, or half empty, depending on your disposition.
>>> While its trivial to write a version of explode() that remembers the
>>> last element seen, and either emits nothing or a pair (you have a little
>>> fixup at the end to deal with, but that's easy too, just annoying), once
>>> you start writing such stateful lambdas, you are tying yourself to
>>> sequential execution in a very error-prone way.  The type system can't
>>> record this assumption, so when someone helpfully later adds a
>>> .parallel() somewhere, your code will silently turn to sewage.  So,
>>> don't ever do this.
>>> Too see why the obvious algorithm is sequential only, consider a
>>> decomposition of a data source with a spliterator.  In most cases, we
>>> don't necessarily know the even-ness or odd-ness of the sum of sizes of
>>> all prior splits.  Which means we don't know whether the first element
>>> of any given split is the left element of some pair or the right element
>>> of some pair.
>>> You might say: I don't care about parallelism, I only care about
>>> sequential.
>>> To which we'd respond: fine, but we're not really interested in
>>> distorting the model to encourage that.  The Stream API is designed
>>> around operations that can be equally well performed in serial or
>>> parallel.  There are no "serial only" operations supported, and that was
>>> an explicit design choice.  Passing state-holding lambdas to these
>>> methods is likely to be a spec violation.  We can't enforce it, any more
>>> than we can enforce the lack of data races, but bear in mind that if you
>>> do this, you're writing broken code, and asking for trouble.
>>> Now, that said, here's a parallel algorithm you can implement with
>>> collect(), though you may find the performance overhead too offensive.
>>> Basically, for each chunk of the input, you compute two possible
>>> answers:
>>> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
>>> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
>>> then you combine these pairs chunks in the mostly obvious way (observe
>>> that p0 has trailing element = !(p1 has trailing element)).  Then when
>>> you get to the top of the tree, you take the one that doesn't have an
>>> orphaned initial element, and toss the other.  Basically you're doing 2x
>>> work for each input element, but it parallelizes fairly cleanly.
>>> Such an algorithm is ideally suited to collect(), because a Collector is
>>> a representation of a catamorphism, which the above transformation is.
>>> Because the combination logic is associative, it parallelizes cleanly
>>> without needing any nasty stateful lambdas.
>>> When we expose the Op APIs and you can write your own
>>> decomposition-aware operations, you'll have another option: write a
>>> StatefulOp.  This won't be an option for 8, but once we do, its easy
>>> enough, and it will let you avoid the extra overhead by creating a
>>> custom operation.
>>> So, to sum up:
>>>   - If you're looking for a one-liner, you'll be disappointed;
>>>   - If you convince yourself "I would never need parallelism here" and
>>> write the obvious unsafe stateful lambda, please write your manager a
>>> letter first explaining how your incompetence is putting the reliability
>>> of his product in jeopardy;
>>>   - If you're willing to write the above collector, you'll likely be
>>> happy enough with the result.
>>> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>>>> Hi.
>>>> I looking in Stream interface for a way to convert a stream of
>>>>     {  t1, t2, .. Tn }
>>>> to
>>>>     { {t1,t2}, ... {Tn-1, Tn} }
>>>> or
>>>>     { {t1,t2}, ... {Tn, null}}
>>>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>>>> So I tried to use explode:
>>>> *        Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>>>          Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>>>> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>>>>              Integer firstValue;
>>>>              @Override
>>>>              public void accept(Stream.Downstream<Pair<Integer, Integer>>
>>>> pairDownstream, Integer v) {
>>>>                  if (firstValue == null) {
>>>>                      firstValue = v;
>>>>                  } else {
>>>>                      pairDownstream.send(new Pair<>(firstValue, v));
>>>>                      firstValue = null;
>>>>                  }
>>>>              }
>>>>          });
>>>> *
>>>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>>>> stream { ,,, {5, null}}.
>>>> Of-course this only simplified example of more general problem I have.
>>>> Thanks
>>>> Boaz

More information about the lambda-dev mailing list