Reactive Streams utility API

James Roper james at
Thu Mar 15 01:24:41 UTC 2018

Hi all,

An update on this. We've now filled out the API with feature parity with
the JDK8 Streams API - for operators that make sense in Reactive Streams.
We've provided example implementations of the API backed by both Akka
Streams and rxjava, showing that it can be widely implemented. The TCK
still needs some work, but covers the major features, and comprehensively
covers all publishers/subscribers with the Reactive Streams TCK (so
comprehensive that we actually found two Reactive Streams TCK violations in
Akka Streams with this, and a couple in rxjava too).

There are two major areas of work left to get something that would be ready
to be a candidate for a final API. The first is to produce a zero
dependency reference implementation of the API. This is what I plan on
starting on next.

The second is to decide what additional operators and generators the API
should provide. So far, the scope has been mostly limited to a subset of
the JDK8 Streams scope, only a few additional API pieces have been created,
such as a few variations on flatMap (one that supports CompletionStage, and
one that supports Iterable). There are a number of other features for
consideration to provide basic necessary functionality for this API, here's
some examples off the top of my head (some of these can already be
implemented in terms of other stages already provided):

* A cancelled subscriber (useful for cleaning up hot publishers)
* An ignoring subscriber (useful when you do the actual work in a previous
stage of the graph, such as mapping to completion stages)
* Error handling subscribers and/or processors
* Termination listening subscribers and/or processors
* A processor that wraps a subscriber and publisher
* The ability to merge streams - so far only concat is provided, and all
flatMaps are essentially concatenations, merge variants may be useful
(though introduce a lot of complexity, such as specifying breadth)
* The ability to split streams into sub streams - a use case for this is in
parsing a stream that contains potentially large sub streams, like parsing
a multipart/form-data body
* Batching of elements in a stream, based on predicate, influenced by
backpressure or based on a scheduled tick
* Scheduled features such as emitting ticks, rate limiting, etc
* The ability to control buffering and asynchronous boundaries within a
* Naming of stages for debug/error reporting/monitoring purposes

Not all of the above may be absolutely necessary, but should be considered,
and there may be other features as well that would be useful to consider.

Please visit the repo and any feedback would be much appreciated:



On 8 March 2018 at 03:59, Brian Goetz <brian.goetz at> wrote:

> To answer the questions at the bottom: the next step is to start working
> on this and get folks excited about contributing.  There's plenty of time
> for process later, but filing a JEP or creating a project shouldn't be a
> barrier to innovating.
> On 2/28/2018 10:33 PM, James Roper wrote:
>> Hi all,
>> We've put together a simple proposal for this. Please read the README for
>> an introduction to this proposal.
>> Regards,
>> James
>> On 22 February 2018 at 11:47, James Roper <james at> wrote:
>> Hi all,
>>> This is an email to give people a heads up that we'd like to look at
>>> creating an API, in the same vein as the JDK8 Streams API, for building
>>> reactive streams (a la JDK9 juc.Flow). Our goals for this are:
>>> * To fill a gap in the JDK where if a developer wants to do even the
>>> simplest of things with a JDK9 juc.Flow, such as map or filter, they need
>>> to bring in a third party library that implements that.
>>> * To produce an API that can build Publishers, Subscribers, Processors,
>>> and complete graphs, for the purposes of consuming APIs that use reactive
>>> streams (for example, JDK9 Http Client).
>>> * To produce an API that aligns closely with, using it
>>> for inspiration for naming, scope, general API shape, and other aspects.
>>> The purpose of this goal is to ensure familiarity of Java developers with
>>> the new API, and to limit the number of concepts Java developers need to
>>> understand to do the different types of streaming offered by the JDK.
>>> * To produce an API that can be implemented by multiple providers
>>> (including an RI in the JDK itself), using the ServiceLoader mechanism to
>>> provide and load a default implementation (while allowing custom
>>> implementations to be manually provided). There are a lot of concerns
>>> that
>>> each different streams implementation provides and implements, beyond
>>> streaming, for example monitoring/tracing, concurrency modelling,
>>> buffering
>>> strategies, performance aspects of the streams handling including fusing,
>>> and context (eg thread local) propagation. This will allow libraries to
>>> use
>>> and provide contracts based on this API without depending on a particular
>>> implementation, and allows developers to select the implementation that
>>> meets their needs.
>>> Non goals:
>>> * To produce a kitchen sink of utilities for working with reactive
>>> streams. There already exist a number of reactive streams implementations
>>> that seek to meet this goal (eg, Akka Streams, Reactor, RxJava), and once
>>> you go past the basics (map, filter, collect), and start dealing with
>>> things like fan in/out, cycles, restarting, etc, the different approaches
>>> to solving this start to vary greatly. The JDK should provide enough to
>>> be
>>> useful for typical every day streaming use cases, with developers being
>>> able to select a third party library for anything more advanced.
>>> We will update this list when we have something ready for public review.
>>> This probably won't be far off. Our hope is that we can propose this as a
>>> JEP.
>>> Regards,
>>> James
>>> --
>>> *James Roper*
>>> *Senior Octonaut*
>>> Lightbend <> – Build reactive apps!
>>> Twitter: @jroper <>

*James Roper*
*Senior Octonaut*

Lightbend <> – Build reactive apps!
Twitter: @jroper <>

More information about the core-libs-dev mailing list