JDBC Next questions regarding j.u.c.Flow integration

Dávid Karnok akarnokd at gmail.com
Fri Oct 6 20:17:11 UTC 2017

Interesting. It helps quite a lot if there is a rich fluent library backing
the Flow API, which unlike CompletableFuture or Stream as of matter of
fact, is not directly available inside the JDK (btw, all sorts of flow
manipulation is possible including getting one item from a flow).

I haven't looked through all the proposed API but we have a couple of
declarative patterns in the reactive world that could be applied, although
they may look odd from an imperative perspective and one needs those fluent
libraries too.

1) Backpressuring operations

You can define a method on Connection/OperationGroup that takes a Function.
This function then takes a class that can hand out Operation builders and
should return a Flow.Publisher<Operation>. It could return a
Flow.Publisher<ActiveOperation> and depending on the original operation, a
Flow.Publisher<Row> can be retrieved from this ActiveOperation. I think the
Operation and ActiveOperation may enter and leave the flows in order, but
one could add a tag object to both Operation and ActiveOperation so the
producers and consumers can match up across the flows.

Flow.Publisher<ActiveOperation> handle(Function<OperationSupplier,
Flow.Publisher<Operation>> operationSupplier);

What should happen if the user calls handle() multiple times or mixed with
any non-Flow method? If the non-Flow methods are implemented on top of
handle() as well, one can merge/concatenate all Flow.Publisher<Operation>
into one main Flow.Publisher. Or there is always the option to throw an

2) Backpressuring rows

That's straightforward, consume the aforementioned Flow.Publisher<Row> and
limit it to at most 1 Subscriber. I guess the trouble is with Row being
stateful: either onNext gets a fresh Row object every time; or the same Row
object and has to be processed by the end-consumer carefully (i.e., a
fanout would have to copy data out before dispatching).

Flow.Publisher<Row> getSharedRows();
Flow.Publisher<Row> getRows() = getSharedRows().map(r -> r.copy());

2017-10-06 20:36 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:

> The use of Flow never seemed organic. It always ended up looking like a
> exercise to use Flow somehow rather than a natural consequence of the
> problem we are trying to solve. The one place where Flow sort of fits is
> RowOperation, witness PublisherOperation. The rest of the API revolves
> around closures and PublisherOperation is very different. Further,
> Operations have results and there is no clear way to get a single result
> from a reactive stream. Well there is, with an aggregator. But that’s what
> RowOperation does so the benefit from PublisherOperation is unclear.
> Using Flow to handle Operation creation/submission just doesn’t work. It
> would be a mistake to push or pull Operations through a reactive stream.
> Operations are created by the Connection/OperationGroup. Once created there
> is no value to passing them back; in fact, doing so would be a source of
> errors. So the user code publishing a stream of Operations that the
> Connection subscribes to just doesn’t work. All that is needed is cancel
> and request hence Connection.onSubscribe. This feels clumsy as it inverts
> the flow of control that Operation creation otherwise uses without fully
> committing to the reactive stream model.
> There may be a completely different API that is built around reactive
> streams from the ground up. That’s not this API. When we started work on
> this API, Flow did not exist so there was no support for reactive streams
> in Java SE. In consultation with the Class Library team we ended up using
> computational monads, i.e. CompletableFutures. This was not my first choice
> but it’s where we ended up.
> Douglas
> On Oct 6, 2017, at 10:49 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
> Thanks for the answers.
> > 4) We have made several attempts to use Flow more extensively but none
> have worked out. While it is easy enough to give a hand waving description
> of a way to use Flow, the devil is in the details. We would be happy to
> better integrate with Flow but at this point we are out of ideas.
> I happen to know a lot about programming with Reactive-Streams/Flow so I'm
> interested what those problems were.
> 2017-10-06 17:55 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:
>> There have been a lot of comments wrt reactive streams and Flow. I have a
>> few general comments.
>> 1) the PublisherOperation in the public version of the API should not
>> have been included. It was created as part of an ultimately failing
>> approach to using Flow. I thought I had created it in a separate branch but
>> apparently not. It has been removed and will not be present in the next
>> upload.
>> 2) The goal of the API is to be asynchronous. It is not a goal to use
>> reactive streams. That is not to say it won’t use them where appropriate
>> but it is not a goal.
>> 3) The Java Class Library Team strongly encouraged us to use
>> CompletableFuture. While they did not discourage use of Flow they did not
>> emphasize it. One of the design goals of the API is to integrate well with
>> Java SE and CompletableFuture seemed to do that better than Flow.
>> 4) We have made several attempts to use Flow more extensively but none
>> have worked out. While it is easy enough to give a hand waving description
>> of a way to use Flow, the devil is in the details. We would be happy to
>> better integrate with Flow but at this point we are out of ideas. If the
>> community thinks Flow is important the best way to help that happen is by
>> producing a detailed proposal in the form of a change set to the existing
>> code. The proposal would have to follow the goals and design principles in
>> the 2016 and 2017 JavaOne presentations and integrate well with the
>> existing API and the rest of Java SE and EE.
>> 5) That said, one of the design principles for the async API is that one
>> way to do something is enough. There are a thousand ways to solve almost
>> any programming problem. An API can always be extended to provide tools to
>> solve a problem in yet another way. For v1 at least the EG intends to keep
>> the async API as small as possible. Adding a second way to do something
>> that is already possible will require a very convincing use case.
>> 6) The current version of the API needs back pressure in two places, to
>> limit the rate at which Operations are created/submitted and to limit the
>> rate at which rows are processed. It (mis)uses Flow to meet those needs.
>> This certainly could stand improvement.
>> Douglas
>> > On Oct 6, 2017, at 7:28 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
>> >
>> > Hi. I have a couple of questions/suggestions about how the proposal uses
>> > j.u.c.Flow.
>> >
>> > As I understand, the proposed underlying non-blocking IO uses
>> > CompletableFutures, which are one shot async sources unlike Flow. Is
>> this
>> > due to async NIO being based around CompletableFutures?
>> >
>> > I haven't delved too deeply into NIO but I'd think a Flow-based NIO
>> could
>> > be possible (and perhaps lower overhead) - I understand such thing is
>> out
>> > of scope here but from an API perspective, would it be possible to have
>> as
>> > less ties to CompletableFutures as possible? For example, have most API
>> > Flow-based and the underlying implementation can use CompletableFutures
>> as
>> > sources of notifications?
>> >
>> > The OperationGroup.operationPublisher(Flow.Publisher<Operation>
>> > publisher) Javadoc has this statement:
>> >
>> > * Any {@link Operation} passed to {@link Flow.Subscriber#onNext} must
>> > be created by
>> >
>> > * this {@link OperationGroup}. If it is not {@link
>> > Flow.Subscriber#onNext} throws
>> > * {@link IllegalArgumentException}
>> >
>> >
>> > The Reactive-Streams spec forbids throwing from onNext. The only
>> option, in
>> > general, for "invalid" onNext is to cancel the Subscription. I don't
>> know
>> > how such error could be propagated back to the user other than adding
>> > failure state/signal to the returned OperationGroup.
>> >
>> > I think Row.isLast shouldn't return Future because one can only block or
>> > spin on an non-completed future. On the conceptional level, why would
>> the
>> > row's consumer want to know this information asynchronously? In the Flow
>> > API, onComplete will be eventually called which indicates there are no
>> more
>> > data coming. On the practical level, isLast poses a concurrency problem
>> > because if the consumer may call isLast for multiple items, it will have
>> > multiple outstanding CompletableFutures one of which may signal true
>> while
>> > the processing of the last row is in progress. Now the consumer's
>> > implementor has to work out the concurrency implications of that.
>> >
>> > --
>> > Best regards,
>> > David Karnok
> --
> Best regards,
> David Karnok

Best regards,
David Karnok

More information about the jdbc-spec-discuss mailing list