Internal and External truncation conditions

Zhong Yu zhong.j.yu at
Sat Feb 9 20:25:12 PST 2013

Based on my own use cases, code that needs forEachUntil() usually
intends to process just enough elements to produce a result, for
example, a lexer scans a char stream until it yields a token. In that
sense forEachUntil() is really an aggregator for *some* elements. We
may have a method in the form of

    interface Stream<T>

        <R> R scan(Function<T,R> scanner)

The scanner is usually stateful. Elements are fed to the scanner,
until it returns a non-null value; that value is the return value of
scan(). If end of stream is reached before scanner returns non-null,
scan() returns null. A scanner may need to react to EOF event, the
application can design an EOF sentinel of type T.

In the parallel case, scanner must be thread-safe; if it returns
non-null for one split, it should return non-null for all splits at
around the same time; one of the non-null values is chosen arbitrarily
as the result of scan().

If null sentinel is too distasteful, scanner can return Optional<R>;
or it can yield result into a Consumer<R> sink.


    Collection<Int> primes = ints.parallel().scan( gather primes till xxx );

    Paragraph para = lines.scan( gather lines till an empty line or EOF );

scan() is only intended for part of the stream. To turn the whole
stream into another stream, say a line stream into a paragraph stream,
flatMap(FlatMapper) should work just fine.

Zhong Yu

On Sat, Feb 9, 2013 at 1:26 PM, Sam Pullara <sam at> wrote:
> Now that we are further along, I wanted to bring this up again. I
> don't think that forEachUntil is sufficient for handling internal and
> external conditions that should truncate stream processing. I've also
> looked at CloseableStream and that doesn't appear to help since it
> isn't possible to wrap a Stream (say an infinite stream) with a
> CloseableStream and get the necessary semantics of cancellation. Also,
> other APIs that don't consider that you might give them a
> CloseableStream will likely still give you back a Stream thus losing
> the semantics.
> Is everyone else happy with forEachUntil and CloseableStream?
> Sam
> ---------- Forwarded message ----------
> From: Sam Pullara <sam at>
> Date: Mon, Dec 31, 2012 at 8:34 AM
> Subject: Re: Cancelation -- use cases
> To: Brian Goetz <brian.goetz at>
> Cc: "lambda-libs-spec-experts at"
> <lambda-libs-spec-experts at>
> I think we are conflating two things with this solution and it doesn't
> work for them in my mind. Here is what I would like the solution to
> cover:
> - External conditions (cancellation, cleanup)
> - Internal conditions (gating based on count, elements and results)
> The first one may be the only one that works in the parallel case. It
> should likely be implemented with .close() on stream that would stop
> the stream as soon as possible. This would be useful for things like
> timeouts. Kind of like calling close on an inputstream in the middle
> of reading it. The other one I think is necessary and hard to
> implement correctly with the parallel case. For instance I would like
> to say:
> stream.gate(e -> e < 10).forEach(e -> …)
> OR
> stream.gate( (e, i) -> e < 10 || i > 10).forEach(e -> …) // i is the
> number of the current element
> That should give me every element in the stream until an element isn't
> < 10 and then stop processing elements. Further, there should be some
> way for the stream source to be notified that we are done consuming it
> in case it is of unknown length or consumes resources. That would be
> more like (assuming we add a Runnable call to Timer):
> Stream stream = ….
> new Timer().schedule(() -> stream.close(), 5000);
> stream.forEach(e -> ….);
> OR
> stream.forEach(e -> try { … } catch() { stream.close() } );
> Sadly, the first gate() case doesn't work well when parallelized. I'm
> willing to just specify what the behavior is for that case to get it
> into the API. For example, I would probably say something like "the
> gate will need to return false once per split to stop processing". In
> either of these cases I think one of the motivations needs to be that
> the stream may be using resources and we need to tell the source that
> we are done consuming it. For example, if the stream is sourced from a
> file, database or even a large amount of memory there should be a
> notification mechanism for doneness that will allow those resources to
> be returned before the stream is exhausted. To that end I think that
> Stream should implement AutoCloseable but overridden with no checked
> exception.
> interface Stream<T> implements AutoCloseable {
>   /**
>    * Closes this stream and releases any system resources associated
>    * with it. If the stream is already closed then invoking this
>    * method has no effect. Close is automatically called when the
>    * stream is exhausted. After this is called, no further elements
>    * will be processed by the stream but currently processing elements
>    * will complete normally. Calling other methods on a closed stream will
>    * produce IllegalStateExceptions.
>    */
>   void close();
>   /**
>    * When the continueProcessing function returns false, no further
>    * elements will be processed after the gate. In the parallel stream
>    * case no further elements will be processed in the current split.
>    */
>   Stream<T> gate(Function<T, Boolean> until);
>   /**
>    * As gate with the addition of the current element number.
>    */
>   Stream<T> gate(BiFunction<T, Integer, Boolean> until);
> }
> This API avoids a lot of side effects that forEachUntil would require
> implement these use cases.
> Sam
> On Dec 30, 2012, at 7:53 PM, Brian Goetz <brian.goetz at> wrote:
> Here's a lower-complexity version of cancel, that still satisfies (in
> series or in parallel) use cases like the following:
>>   - Find the best possible move after thinking for 5 seconds
>>   - Find the first solution that is better than X
>>   - Gather solutions until we have 100 of them
> without bringing in the complexity or time/space overhead of dealing
> with encounter order.
> Since the forEach() operation works exclusively on the basis of
> temporal/arrival order rather than spatial/encounter order (elements
> are passed to the lambda in whatever order they are available, in
> whatever thread they are available), we could make a canceling variant
> of forEach:
>  .forEachUntil(Block<T> sink, BooleanSupplier until)
> Here, there is no confusion about what happens in the ordered case, no
> need to buffer elements, etc.  Elements flow into the block until the
> termination condition transpires, at which point there are no more
> splits and existing splits dispense no more elements.
> I implemented this (it was trivial) and wrote a simple test program to
> calculate primes sequentially and in parallel, counting how many could
> be calculated in a fixed amount of time, starting from an infinite
> generator and filtering out composites:
>            Streams.iterate(from, i -> i + 1)  // sequential
>                    .filter(i -> isPrime(i))
>                    .forEachUntil(i -> {
>                        chm.put(i, true);
>                    }, () -> System.currentTimeMillis() >= start+num);
> vs
>            Streams.iterate(from, i -> i+1)    // parallel
>                    .parallel()
>                    .filter(i -> isPrime(i))
>                    .forEachUntil(i -> {
>                        chm.put(i, true);
>                    }, () -> System.currentTimeMillis() >= start+num);
> On a 4-core Q6600 system, in a fixed amount of time, the parallel
> version gathered ~3x as many primes.
> In terms of being able to perform useful computations on infinite
> streams, this seems a pretty attractive price-performer; lower spec
> and implementation complexity, and covers many of the use cases which
> would otherwise be impractical to attack with the stream approach.
> On 12/28/2012 11:20 AM, Brian Goetz wrote:
> I've been working through some alternatives for cancellation support in
> infinite streams.  Looking to gather some use case background to help
> evaluate the alternatives.
> In the serial case, the "gate" approach works fine -- after some
> criteria transpires, stop sending elements downstream.  The pipeline
> flushes the elements it has, and completes early.
> In the parallel unordered case, the gate approach similarly works fine
> -- after the cancelation criteria occurs, no new splits are created, and
> existing splits dispense no more elements.  The computation similarly
> quiesces after elements currently being processed are completed,
> possibly along with any up-tree merging to combine results.
> It is the parallel ordered case that is tricky.  Supposing we partition
> a stream into
>   (a1,a2,a3), (a4,a5,a6)
> And suppose further we happen to be processing a5 when the bell goes
> off.  Do we want to wait for all a_i, i<5, to finish before letting the
> computation quiesce?
> My gut says: for the things we intend to cancel, most of them will be
> order-insensitive anyway.  Things like:
>  - Find the best possible move after thinking for 5 seconds
>  - Find the first solution that is better than X
>  - Gather solutions until we have 100 of them
> I believe the key use case for cancelation here will be when we are
> chewing on potentially infinite streams of events (probably backed by
> IO) where we want to chew until we're asked to shut down, and want to
> get as much parallelism as we can cheaply.  Which suggests to me the
> intersection between order-sensitive stream pipelines and cancelable
> stream pipelines is going to be pretty small indeed.
> Anyone want to add to this model of use cases for cancelation?

More information about the lambda-libs-spec-observers mailing list