Example of a Flow-based JDBC API

Douglas Surber douglas.surber at oracle.com
Fri Oct 13 18:28:18 UTC 2017

There is a use case for back pressure in the two places I outlined and at least two others you mentioned, large bind values and LOBs. Flow provides a standard way to expose the hooks needed to support those use cases. But as you say there is no other support within (or planned for) Java SE 10/19.x/whatever. That is a valid argument against including those features. The first version of this API will be limited. It will not support every possible use case. Not supporting the cases where back pressure is required is not out of the question.

Like with JDBC, Java SE will not include an implementation of the async database access API. What would IBM say if the Oracle Database async database access implementation was included in standard Java? So that’s not going to happen.

I don’t know what you mean by “service provider interface nature”. 

What would help me is if you explored whether it would be possible to implement the kind of API you are interested in using the proposed API? I would ask that you keep in mind that requiring back pressure support from the database access API where the implementations of that API do not need it is not a good thing.

I did turf up some of the discussion of CompletionStage. There were at least two objections.

  1) CompletionStage is not a Future so a user would have to call toCompletableFuture to get the result (or extract it via some closure).

  2) A correct implementation of CompletionStage is really hard so every vendor would almost certainly use CompletableFuture
anyway so given (1) why not just use CompletableFuture.

Looking back on that I don’t find it as convincing as I did at the time. Replacing CompletableFuture everywhere with CompletionStage would be reasonable. It would be nice to change Operation.submit to return a CompletionStage except that Submission supports cancel. Specifying that Submission extends CompletionStage plus cancel is problematic. It would require implementors to subclass CompletableFuture to handle cancel correctly and that is a recipe for bugs which is exactly why Submission exists in the first place.


> On Oct 13, 2017, at 10:29 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
> In that case, using any Flow API in the async JDBC API appears to be a big no-no as there is nothing but the SubmissionPublisher in Java SE that implements parts of it and SubmissionPublisher is not a Processor (backpressure doesn't compose through SP). I don't think there are any plans to include Flow transformation and coordination logic in any future Java SE.
> Note though that implementing such Flow-based logic is a non-trivial task and Reactive Streams users, who would be the main consumers of a reactive JDBC, are quite customed to picking a vendor for their reactive transformation needs. If you don't plan to provide implementation behind the async JDBC API, just the interfaces and enums, a service provider interface nature should mean no additional disadvantage to driver vendors or end-users.
> 2017-10-13 19:04 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>:
> If you want to demonstrate that your ideas are appropriate for this project you need to show example code that uses only the stuff that will be in the Java SE release that includes this project, Java 10/19.x/whatever. The API this project specifics must be usable without any external library other than a vendor implementation. Sure additional external libraries can be helpful e.g. Hibernate, but the API must be usable without any such libraries e.g. JDBC. We will not get approval of an API that is useless without support from a non-standard library.
> There is unlikely to be any internal user of any database access API in Java SE. The API will be used exclusively by code outside of Java SE. But the API cannot require a non-standard library (in addition to a vendor implementation) to be of use. It must be usable within Java SE.
> Douglas
>> On Oct 13, 2017, at 9:19 AM, Dávid Karnok <akarnokd at gmail.com <mailto:akarnokd at gmail.com>> wrote:
>> I don't understand your response.
>> Isn't the aim of your project to have an API that will be utilized by other libraries and programs? Or is it intended to be used solely internally by Java SE? Does that mean that driver implementations should also be in Java SE otherwise demonstrating, let's say, a MySQL driver usage is by definition invalid?
>> Do you expect me to demonstrate the usage of my proposed api by hand-crafting flow operations on it?
>> 2017-10-13 17:37 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>:
>> My understanding of the purpose of your example code is to demonstrate how your ideas would be used. Since the goal is integration with Java SE there is no value to an example that depends on an external library; an external library is not Java SE. I am not aware of a project to add a library such as the one you reference to Java SE. This project will not undertake that effort. If you want to demonstrate the suitability of your ideas for this project your example code can only use what will be in the Java SE release that will include your ideas.
>> Douglas
>>> On Oct 13, 2017, at 8:30 AM, Dávid Karnok <akarnokd at gmail.com <mailto:akarnokd at gmail.com>> wrote:
>>> The JdbcExample demonstrates how one can use an external fluent library to talk to the proposed API. The interfaces I proposed don't use any non-Java SE dependencies. 
>>> In addition, the package the example interfaces reside in are by no means an indication where such interfaces would live in the Java-SE.
>>> 2017-10-13 17:03 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>:
>>> First observation, your sample code imports and uses extensively a library that does not exist in Java SE. This is contrary to one of the non-negotiable goals of this project, to integrate with Java SE.
>>> Douglas
>>> > On Oct 13, 2017, at 5:05 AM, Dávid Karnok <akarnokd at gmail.com <mailto:akarnokd at gmail.com>> wrote:
>>> >
>>> > Hi again,
>>> >
>>> > I've tried to come up with an API for a reactive JDBC driver ended up with
>>> > these of interfaces:
>>> >
>>> > https://github.com/akarnokd/akarnokd-misc-java9/tree/master/src/main/java/hu/akarnokd/java9/jdbc <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akarnokd_akarnokd-2Dmisc-2Djava9_tree_master_src_main_java_hu_akarnokd_java9_jdbc&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=R2e6z8LhZJgVhY_0_ma_FmV3rkOrNcLWs0l_WWu-44s&s=0iUukc6OkUFrAv3k8GwZlbcb0lvmiSeHdOgExdV6zQo&e=>
>>> >
>>> > The entry point is JdbcConnectionSource which contains the connection
>>> > information. Its connect() method returns a Flow.Publisher<JdbcConnection>
>>> > where the JdbcConnection becomes available once the async connection was
>>> > successful.
>>> >
>>> > The JdbcConnection allows creating the connection specific JdbcOperations
>>> > and there is only one way to present them to the connection: execute()
>>> > which takes a Flow.Publisher of JdbcOperations and returns a Flow.Publisher
>>> > of JdbcStatements representing the execution of those operations. One
>>> > Flow.Publisher<JdbcOperation> is considered as a group. The various
>>> > execution modes (sequential, parallel, in transaction) are to be expressed
>>> > by specific JdbcOperation instances included in the operations
>>> > Flow.Publisher. Too many calls to execute() may be rejected by the driver.
>>> > Any other non-grouped or direct operation submission can be implemented on
>>> > top of execute(). I'm not certain the complication of operation (group)
>>> > graphs should be part of the API - such thing could be built on top of just
>>> > execute by controlling the consumption of the Flow.Publisher<JdbcOperation>
>>> > and intercepting events through the resulting
>>> > Flow.Publisher<JdbcStatement>s.
>>> >
>>> > Building a JdbcOperation should be straightforward, but there are two
>>> > additional properties:
>>> >
>>> > - a JdbcOperation could be marked deferred so its execution doesn't begin
>>> > unless there is a consumer for its result.
>>> > - some properties may be set from a Flow of bytes, either in the form of
>>> > byte arrays of in the form of ByteBuffers that are NIO friendly.
>>> >
>>> > An open question is how to create multi-row operations, such as batch
>>> > inserts represented by a single JdbcOperation. If individual JdbcOperations
>>> > are cheap to create, the driver could batch up subsequent JdbcOperations
>>> > with the same structure on its own. Alternatively, a
>>> > JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
>>> > JdbcOperations.Builder>>) method could consume a sequence of Ts and have
>>> > each of them set parameters on a per-item provided JdbcOperation.Builder.
>>> >
>>> > For each valid JdbcOperation, a JdbcStatement is created. The only way to
>>> > consume any result from an operation is through an user-provided row to T
>>> > transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).
>>> >
>>> > The function is called when each row of data is ready to be consumed
>>> > non-blockingly. POJO or ORM-based mapping could be implemented on top of
>>> > this.
>>> >
>>> > Again, there could be BLOB-like columns which may or may not be reasonable
>>> > to consume via get() can be consumed by getBytes() as byte arrays,
>>> > getBuffer() as a sequence of ByteBuffers and one overload where the user
>>> > can specify a supplier for custom ByteBuffers. These are the dual of the
>>> > JdbcOperation.parameterBuffer() methods which should also help playing nice
>>> > with NIO.
>>> >
>>> > Here is an example program that would consume such API via the help of the
>>> > Reactive4JavaFlow library that supports the Flow API directly:
>>> >
>>> > import hu.akarnokd.reactive4javaflow.*;
>>> >
>>> > public class JdbcExample {
>>> >
>>> >    static JdbcConnectionSource connectionSource() {
>>> >        return null;
>>> >    }
>>> >
>>> >    public static void main(String[] args) {
>>> >        Esetleg.fromPublisher(connectionSource().connect())
>>> >                .observeOn(SchedulerServices.computation())
>>> >                .flatMapPublisher(conn -> {
>>> >                    return Folyam.fromPublisher(conn.execute(
>>> >                            Esetleg.fromCallable(() -> {
>>> >                                return conn.query()
>>> >                                        .query("SELECT :v FROM DUAL")
>>> >                                        .parameter("v", JdbcDataType.INT, 1)
>>> >                                        .build();
>>> >                            })
>>> >                    ))
>>> >                    .flatMap(stmt -> {
>>> >                        return Folyam.fromPublisher(
>>> >                                    stmt.results(row -> row.get("1",
>>> > Integer.TYPE))
>>> >                                ).observeOn(SchedulerServices.single());
>>> >                    })
>>> >                    .concatWith(Folyam.fromPublisher(conn.close()).map(v -> 0));
>>> >                })
>>> >                .blockingSubscribe(System.out::println,
>>> > Throwable::printStackTrace);
>>> >    }
>>> > }
>>> >
>>> >
>>> > An open question is where execute() should consume data and where results()
>>> > should perform the row mapping (probably not on the NIO dispatch thread).
>>> > There are a couple of possibilities, i.e., the driver uses a thread pool
>>> > for them or asks the user to provide one. The latter one may give an
>>> > opportunity to avoid one thread hop and utilize one of the user's existing
>>> > thread pool.
>>> >
>>> > --
>>> > Best regards,
>>> > David Karnok
>>> -- 
>>> Best regards,
>>> David Karnok
>> -- 
>> Best regards,
>> David Karnok
> -- 
> Best regards,
> David Karnok

More information about the jdbc-spec-discuss mailing list