Generalized transformation of a Stream<X> into a Stream<Y>, with lazy evaluation and stateful transformation functions

Lattie latsama at
Thu Feb 21 20:36:42 PST 2013

Following up on a thread from earlier this month:

(See thread "Re: Grouping stream elements by their position - how to handle
tail of stream ?")

There has been some interest expressed (by Boaz Nahum, Zhong Yu, Howard
Lovatt, and myself) in expanding the operations provided by Stream to
support a generalized 'on-the-fly' transformation from a Stream<X> to a
Stream<Y>, where the transformation function consumes 1 or more X's to
produce 1 or more Y's.

Here are a couple examples of this type of transformation:

### EXAMPLES ###

1) Transform a Stream<Character> into a Stream<String> where the resulting
Strings are the words parsed from the stream of Characters.

Stream<Character> chars = makeACharacterStream("This is a test");

Stream<String> words = chars.transform(MyParsingTransformation);;

Would print:


Or, as a one liner:

makeACharacterStream("This is a

Note that this should work for both finite and infinite input streams, and
that lazy evaluation should be preserved (i.e., only enough Characters are
pulled from the input stream to produce 1 parsed word as each word is
pulled out of the output stream.)

Another example:

2) Transform a Stream<Card> (representing a shuffled deck of 52 cards) into
a Stream<List<Card>> where each List<Card> represents a dealt hand of N

Stream<Card> deck = getShuffledDeck();

Stream<List<Card>> hands = deck.transform(dealTransformation(5)).limit(3);
 // deal 3 hands

This transformation will deal 3 hands of 5 cards each.  Only 15 cards will
be consumed from the deck stream.

The first parsing example is inherently linear, but the 2nd could
potentially be parallelized (assuming one doesn't mind the additional
randomization of the (already shuffled) deck from potential race conditions
in the dealing process.  In other words, the function returned by
dealTransformation() does not care if the Cards it gets from the input
stream are in any particular order.)


This general capability would mimic the familiar 'pipe' model offered by
the unix shell.  Each Stream.transformation() operation would consume the
incoming objects, transform them in some way (possibly including
aggregation, or expansion), and produce the new output objects.

Let me just say here that this would be a truly *fantastic* ability to
have!  (Thus my motivation in putting together this note.)

The question was raised as to whether the desired functionality could be
constructed using the existing Stream.collect() and Stream.flatMap()

Unfortunately, after some experimentation and internal discussions, the
answer was found to be that no, those methods are not sufficient.  I've
attached two example source files (run against the b78 build) showing the
results of trying to get the desired behavior using Stream.collect() and
Stream.flatMap().  Run them.  They are interesting!  :)

Stream.collect() consumes it's entire input stream prior to returning, thus
it is not suitable for infinite input streams, or even for finite but large
input streams (since the user may just want, for example, the first 2
parsed words from a 1 TB stream of Characters.  collect() will parse the
*entire* 1 TB input before the resulting stream gets to the .limit(2)
method downstream.  Not good.)

Stream.flatMap() is much closer to being able to do the trick.  It does
it's job lazily, so can handle large/infinite inputs.  The problem (as
originally raised by Boaz Nahum) is that there is no mechanism within
flatMap to handle the end of the input Stream gracefully.  (This results
from the FlatMapper being 'stateful', but with no way to 'flush' the final
state to produce any hanging output objects implied by that state and
propagate them to the output stream.)

Now there is a whole separate discussion about 'stateful' lambdas and
whether they are good or bad that I won't pretend to fully understand.

However I will point out that some of the lambdas expected to be passed
into collect() are stateful, so maybe it's ok?  I dunno.

Anyway, bottom line, it appears to me that flatMap() is an instance of a
more generalized transform() concept, and that it would be very beneficial
to explicitly allow the type of usage implied by this 'transform()' idea,
and that in fact flatMap() could be implemented as just one instance of
such a transformation.  (But that the desired transform() capability can
*not* be implemented on top of flatMap() or collect()).

The design of Stream.transform() could closely mirror the model used by
Stream.flatMap(), with some added mechanism to allow the final state to be
pulled out of the transformation after the input stream has been exhausted.
 (I can expand on this in more detail if there is interest.)

Thank you for your attention, and let the games begin!

PS: jdk 8 lambda rocks.  Java will once again reign supreme.  Thanks

More information about the lambda-dev mailing list