Simplifying sequential() / parallel()

Brian Goetz brian.goetz at
Fri Mar 22 06:33:18 PDT 2013

The problem with stateful lambdas is that, unless one block of code has 
control over the entire pipeline, it is an accident waiting to happen.

Let's say you receive a stream as a parameter:

   void foo(Stream s) { ... }

and you want to do something that requires a stateful mapper:

   void foo(Stream s) { stateful ...)...

That's a bug already.  Because you don't know that the stream you were 
passed in is sequential.  But I doubt that people will remember, even 
most of the time, they need to do:

       s.sequential().map(... stateful ...)...

instead.  Won't happen.

Stateful lambdas introduce the need for non-modular reasoning about 
stream pipelines (who created this? who will consume this? in what state 
was it created?).  And, it has all the same self-deception problems as 
thread-safety.  People convince themselves "I don't need to think about 
synchronization because no one will ever use this object concurrently."

So, while I sympathize with the desire to let people say "I know that 
this entire stream pipeline has been carefully controlled such as to not 
have statefulness distort its results", I think in reality, this will 
quickly turn into "statefulness is OK" in most people's minds.  With the 
attended inevitable foot-shooting.

On 3/21/2013 9:57 PM, Joe Bowbeer wrote:
> I'm traveling now and won't be able to respond promptly but this topic
> has been raised a couple of times already. Feel free to copy and paste
> my response from previous discussions:)
> Rephrasing, I'm OK with non-interference but I object to banning
> stateful in sequential ops.
> I think there should be a one-one correspondence between any for loop
> and a sequential forEach.
> Can you compare your restrictions with those in Scala and Groovy? Scala
> in particular, because it is more strictly defined, and I'm pretty sure
> I've combined stateful expressions with functional forms in Scala, to
> good effect. (One of the benefits of being multi-paradigmatic?)
> In addition, I'm wary of the new form of forEach. If anything, I'd like
> its name to be simpler, e.g., each, not longer.
> Joe
> On Mar 21, 2013 3:48 PM, "Brian Goetz" <brian.goetz at
> <mailto:brian.goetz at>> wrote:
>     Doug and I have been revisiting sequential() and parallel().  I
>     think there's a nice simplification here.
>     The original motivation for sequential() was because we originally
>     had .into(collection), and many collections were not thread-safe so
>     we needed a means to bring the computation back to the current
>     thread.  The .sequential() method did that, but it also brought back
>     a constraint of encounter ordering with it, because if people did:
>         stuff.parallel().map(...).__sequential().into(new ArrayList<>());
>     not respecting encounter order would violate the principle of least
>     astonishment.
>     So the original motivation for sequential() was "bring the
>     computation back to the current thread, in order."  This was doable,
>     but has a high price -- a full barrier where we buffer the contents
>     of the entire stream before doing any forEach'ing.
>     Most of the time, sequential() only appears right before the
>     terminal operation.  But, once implemented, there was no reason to
>     constrain this to appear right before the forEach/into, so we didn't.
>     Once we discovered a need for .parallel(), it made sense it be the
>     dual of .sequential() -- fully unconstrained.  And again, the
>     implementation wasn't *that* bad -- better than .sequential().  But
>     again, the most desirable position for .parallel() is right after
>     the source.
>     Then we killed into() and replaced it with reduction, which is a
>     much smarter way of managing ordering.  Eliminating half the
>     justification for .sequential().
>     As far as I can tell, the remaining use cases for .sequential() are
>     just modifiers to forEach to constrain it, in order, to the current
>     thread.
>     As in:
>        ints().parallel().filter(i -> isPrime(i))
>              .sequential().forEach(System.__out::println)
>     Which could be replaced by
>     .__forEachSequentialAndOrderedInC__urrentThread(), with a suitably
>     better name.  Which could further be simplified to ditch the "in
>     current thread" part by doing some locking in the implementation,
>     which brings us to .forEachOrdered(action).  Which nicely
>     complements .collectUnordered, and the two actually stand better
>     with their duals present (reduce is by default ordered; forEach is
>     by default unordered.)
>     The "put it anywhere" behavior of .parallel was completely
>     bootstrapped on the "put it anywhere" nature of .sequential; we
>     never really set out to support transitions in the API.
>     So, pulling the rug out from under the house of cards, I think we
>     can fall back to:
>     1.  Modify semantics of .sequential and .parallel to apply globally
>     to the entire pipeline.  This works because pipelines are fully lazy
>     anyway, so we don't commit to seq-ness/par-ness until we hit the
>     terminal op.  So they are functional versions of "set the seq/par
>     bit in the source".  And that simplifies the specification of
>     seq/par down to a single property of the entire pipeline -- much
>     easier to spec.
>     2.  Add .forEachOrdered.  For sequential streams, this is just
>     .forEach.  For par streams, we use a lock to avoid concurrent
>     invocation of the lambda, and loosen up the current behavior from
>     "full barrier" to "partial barrier", so that when the next chunk is
>     available, we can start working right away.  This is easy to
>     accomplish using the existing AbstractTask machinery.
>     Before we go there, does anyone have use cases for .sequential() /
>     .parallel() that *don't* put the parallel right after the source, or
>     the sequential right before a forEach?

More information about the lambda-libs-spec-experts mailing list