# Grouping stream elements by their position - how to handle tail of stream ?

Howard Lovatt howard.lovatt at gmail.com
Tue Feb 5 14:41:08 PST 2013

```Hi,

In many engineering/science languages you have a reshape, transpose, and/or sparse function that can be applied to a matrix or vector. Taking transpose as an example (forgive the very sketchy code below using methods/classes that don't exist - hopefully you will get the idea - had to write on iPad on the run):

DoubleMatrix m = {{x1, x2}, {y1, y2}}; // unfortunately no stream literal syntax, yet!
DoubleMatrix mt = m.transpose(); // {{x1, y1}, {x1, y2}

Where:

// Matrix is an immutable Stream
class DoubleMatrix implements Stream<DoubleVector> {
private final DoubleVector m; // store the elements in a continuous block to ensure cache hits
private final int nrows;
private final int ncols;
public double getDouble(final int row, final int col) {
// should check bounds
return m.getDouble(row * ncols + col);
}
public DoubleMatrix transpose() {
final int[] oldPermute = m.permute();
final int[] newPermute = new int[nrows * ncols];
for (int r = 0; r < nrows; r++) {
for (int c = 0; c < ncols; c++) {
permute[oldPermute[r * ncols + c]] = c * nrows + r;
}
}
return new DoubleMatrix(m, ncols, nrows, newPermute);
}
public DoubleMatrix(final DoubleVector m, final int nrows, final int ncols, final int[] permute) { ... }
...
}

// Vector is an immutable Stream
DoubleVector extends AbstractDoubleStream {
private final static int[] noPermutations = ...;
private final int[] permute;
public int[] permute() {
if (permute == null) { return noPermutations; }
return permute;
}
public double getDouble(final int index) {
if (permute == null) { return super.getDouble(index); }
return super.getDouble(permute[index]);
}
public DoubleVector(final DoubleVector v, final int[] permute) { ... }
...
}

There might be other use cases for permuting streams; like parsing characters into words, which would require permute to be in Stream and not Vector. However the above code is considerably simplified because the collections are immutable. In Stream the permute operations should ideally be lazy, unlike the above which is a paraphrase of existing matrix classes that exist in various libraries.

Cheers,

-- Howard.

On 05/02/2013, at 10:47 AM, Brian Goetz <brian.goetz at oracle.com> wrote:

> Could you clarify what you are suggesting?
>
> On 2/4/2013 6:27 PM, Howard Lovatt wrote:
>> Hi Brian,
>>
>> This is an enquiry related to grouping or parsing an input stream, but
>> a little different.
>>
>> In numerical programs common operations are matrix: transpose, other
>> reshaping, or 'sparsification'. I have seen these implemented on
>> parallel architectures as an indirection of indexes, something like:
>>
>> matrix[indexTransformation[index]]
>>
>> Has thought been given to such a capability for the stream library?
>>
>> Thanks,
>>
>> -- Howard.
>>
>>
>> On 03/02/2013, at 3:04 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>
>>> Look at the implementations in Collectors for a hint.  Obviously we owe
>>> some significant documentation, which unfortunately lags the code by too
>>> much.  But a Collector is basically a mutable foldl/inject; you provide
>>> a way to create a new mutable container, a way to add a single element
>>> to it, and a way to combine two mutable containers.  The framework
>>> decomposes the source, creates a bunch of little containers at the
>>> leaves, and them goes up the tree, merging.
>>>
>>> Dumping the elements into an ArrayList maps easily to Collector:
>>>
>>>
>>> This parallelizes, as we can collect each chunk into a list, and then
>>> merge the lists up the tree with addAll.
>>>
>>> The "how do I get the last element" problem goes away, since your data
>>> structure can explicitly maintain the state of whether the chunk ends
>>> with a balanced pair or an extra.
>>>
>>> But, you need to calculate BOTH p0 and p1 for each chunk (except maybe
>>> the left-spine chunks), because you don't know until the end which will
>>> have the right parity.
>>>
>>> On 2/2/2013 7:49 PM, Lattie wrote:
>>>>> Now, that said, here's a parallel algorithm you can implement with
>>>>> collect(), though you may find the performance overhead too offensive.
>>>>> Basically, for each chunk of the input t1..tn, you compute two possible
>>>>>
>>>>> p0 = (t1,t2), (t3,t4), ..., maybe leftover tn
>>>>
>>>> This is what I want... the ability to compute p0.  I was not able to
>>>> figure out how to use collect() to do this though, and so I went down
>>>> the road of explode() with a stateful lambda... can someone point me
>>>> to any tutorial or info on how to properly use collect()?
>>>>
>>>> TIA
>>>>
>>>>
>>>> On Sat, Feb 2, 2013 at 1:55 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>>>> The glass is half full, or half empty, depending on your disposition.
>>>>>
>>>>> While its trivial to write a version of explode() that remembers the
>>>>> last element seen, and either emits nothing or a pair (you have a little
>>>>> fixup at the end to deal with, but that's easy too, just annoying), once
>>>>> you start writing such stateful lambdas, you are tying yourself to
>>>>> sequential execution in a very error-prone way.  The type system can't
>>>>> .parallel() somewhere, your code will silently turn to sewage.  So,
>>>>> don't ever do this.
>>>>>
>>>>> Too see why the obvious algorithm is sequential only, consider a
>>>>> decomposition of a data source with a spliterator.  In most cases, we
>>>>> don't necessarily know the even-ness or odd-ness of the sum of sizes of
>>>>> all prior splits.  Which means we don't know whether the first element
>>>>> of any given split is the left element of some pair or the right element
>>>>> of some pair.
>>>>>
>>>>> You might say: I don't care about parallelism, I only care about
>>>>> sequential.
>>>>>
>>>>> To which we'd respond: fine, but we're not really interested in
>>>>> distorting the model to encourage that.  The Stream API is designed
>>>>> around operations that can be equally well performed in serial or
>>>>> parallel.  There are no "serial only" operations supported, and that was
>>>>> an explicit design choice.  Passing state-holding lambdas to these
>>>>> methods is likely to be a spec violation.  We can't enforce it, any more
>>>>> than we can enforce the lack of data races, but bear in mind that if you
>>>>> do this, you're writing broken code, and asking for trouble.
>>>>>
>>>>> Now, that said, here's a parallel algorithm you can implement with
>>>>> collect(), though you may find the performance overhead too offensive.
>>>>> Basically, for each chunk of the input t1..tn, you compute two possible
>>>>>
>>>>> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
>>>>> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
>>>>>
>>>>> then you combine these pairs chunks in the mostly obvious way (observe
>>>>> that p0 has trailing element = !(p1 has trailing element)).  Then when
>>>>> you get to the top of the tree, you take the one that doesn't have an
>>>>> orphaned initial element, and toss the other.  Basically you're doing 2x
>>>>> work for each input element, but it parallelizes fairly cleanly.
>>>>>
>>>>> Such an algorithm is ideally suited to collect(), because a Collector is
>>>>> a representation of a catamorphism, which the above transformation is.
>>>>> Because the combination logic is associative, it parallelizes cleanly
>>>>> without needing any nasty stateful lambdas.
>>>>>
>>>>> When we expose the Op APIs and you can write your own
>>>>> decomposition-aware operations, you'll have another option: write a
>>>>> StatefulOp.  This won't be an option for 8, but once we do, its easy
>>>>> enough, and it will let you avoid the extra overhead by creating a
>>>>> custom operation.
>>>>>
>>>>> So, to sum up:
>>>>> - If you're looking for a one-liner, you'll be disappointed;
>>>>> - If you convince yourself "I would never need parallelism here" and
>>>>> write the obvious unsafe stateful lambda, please write your manager a
>>>>> letter first explaining how your incompetence is putting the reliability
>>>>> of his product in jeopardy;
>>>>> - If you're willing to write the above collector, you'll likely be
>>>>> happy enough with the result.
>>>>>
>>>>>
>>>>> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>>>>>> Hi.
>>>>>>
>>>>>> I looking in Stream interface for a way to convert a stream of
>>>>>>   {  t1, t2, .. Tn }
>>>>>> to
>>>>>>   { {t1,t2}, ... {Tn-1, Tn} }
>>>>>> or
>>>>>>   { {t1,t2}, ... {Tn, null}}
>>>>>>
>>>>>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>>>>>>
>>>>>>
>>>>>>
>>>>>> So I tried to use explode:
>>>>>>
>>>>>> *        Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>>>>>
>>>>>>        Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>>>>>> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>>>>>>
>>>>>>            Integer firstValue;
>>>>>>
>>>>>>            @Override
>>>>>>            public void accept(Stream.Downstream<Pair<Integer, Integer>>
>>>>>> pairDownstream, Integer v) {
>>>>>>
>>>>>>                if (firstValue == null) {
>>>>>>                    firstValue = v;
>>>>>>                } else {
>>>>>>
>>>>>>                    pairDownstream.send(new Pair<>(firstValue, v));
>>>>>>                    firstValue = null;
>>>>>>
>>>>>>                }
>>>>>>            }
>>>>>>
>>>>>>        });
>>>>>> *
>>>>>>
>>>>>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>>>>>> stream { ,,, {5, null}}.
>>>>>>
>>>>>> Of-course this only simplified example of more general problem I have.
>>>>>>
>>>>>> Thanks
>>>>>> Boaz
>>>
```