Example of a Flow-based JDBC API

Douglas Surber douglas.surber at oracle.com
Fri Oct 13 15:03:00 UTC 2017

First observation, your sample code imports and uses extensively a library that does not exist in Java SE. This is contrary to one of the non-negotiable goals of this project, to integrate with Java SE.


> On Oct 13, 2017, at 5:05 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
> Hi again,
> I've tried to come up with an API for a reactive JDBC driver ended up with
> these of interfaces:
> https://github.com/akarnokd/akarnokd-misc-java9/tree/master/src/main/java/hu/akarnokd/java9/jdbc
> The entry point is JdbcConnectionSource which contains the connection
> information. Its connect() method returns a Flow.Publisher<JdbcConnection>
> where the JdbcConnection becomes available once the async connection was
> successful.
> The JdbcConnection allows creating the connection specific JdbcOperations
> and there is only one way to present them to the connection: execute()
> which takes a Flow.Publisher of JdbcOperations and returns a Flow.Publisher
> of JdbcStatements representing the execution of those operations. One
> Flow.Publisher<JdbcOperation> is considered as a group. The various
> execution modes (sequential, parallel, in transaction) are to be expressed
> by specific JdbcOperation instances included in the operations
> Flow.Publisher. Too many calls to execute() may be rejected by the driver.
> Any other non-grouped or direct operation submission can be implemented on
> top of execute(). I'm not certain the complication of operation (group)
> graphs should be part of the API - such thing could be built on top of just
> execute by controlling the consumption of the Flow.Publisher<JdbcOperation>
> and intercepting events through the resulting
> Flow.Publisher<JdbcStatement>s.
> Building a JdbcOperation should be straightforward, but there are two
> additional properties:
> - a JdbcOperation could be marked deferred so its execution doesn't begin
> unless there is a consumer for its result.
> - some properties may be set from a Flow of bytes, either in the form of
> byte arrays of in the form of ByteBuffers that are NIO friendly.
> An open question is how to create multi-row operations, such as batch
> inserts represented by a single JdbcOperation. If individual JdbcOperations
> are cheap to create, the driver could batch up subsequent JdbcOperations
> with the same structure on its own. Alternatively, a
> JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
> JdbcOperations.Builder>>) method could consume a sequence of Ts and have
> each of them set parameters on a per-item provided JdbcOperation.Builder.
> For each valid JdbcOperation, a JdbcStatement is created. The only way to
> consume any result from an operation is through an user-provided row to T
> transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).
> The function is called when each row of data is ready to be consumed
> non-blockingly. POJO or ORM-based mapping could be implemented on top of
> this.
> Again, there could be BLOB-like columns which may or may not be reasonable
> to consume via get() can be consumed by getBytes() as byte arrays,
> getBuffer() as a sequence of ByteBuffers and one overload where the user
> can specify a supplier for custom ByteBuffers. These are the dual of the
> JdbcOperation.parameterBuffer() methods which should also help playing nice
> with NIO.
> Here is an example program that would consume such API via the help of the
> Reactive4JavaFlow library that supports the Flow API directly:
> import hu.akarnokd.reactive4javaflow.*;
> public class JdbcExample {
>    static JdbcConnectionSource connectionSource() {
>        return null;
>    }
>    public static void main(String[] args) {
>        Esetleg.fromPublisher(connectionSource().connect())
>                .observeOn(SchedulerServices.computation())
>                .flatMapPublisher(conn -> {
>                    return Folyam.fromPublisher(conn.execute(
>                            Esetleg.fromCallable(() -> {
>                                return conn.query()
>                                        .query("SELECT :v FROM DUAL")
>                                        .parameter("v", JdbcDataType.INT, 1)
>                                        .build();
>                            })
>                    ))
>                    .flatMap(stmt -> {
>                        return Folyam.fromPublisher(
>                                    stmt.results(row -> row.get("1",
> Integer.TYPE))
>                                ).observeOn(SchedulerServices.single());
>                    })
>                    .concatWith(Folyam.fromPublisher(conn.close()).map(v -> 0));
>                })
>                .blockingSubscribe(System.out::println,
> Throwable::printStackTrace);
>    }
> }
> An open question is where execute() should consume data and where results()
> should perform the row mapping (probably not on the NIO dispatch thread).
> There are a couple of possibilities, i.e., the driver uses a thread pool
> for them or asks the user to provide one. The latter one may give an
> opportunity to avoid one thread hop and utilize one of the user's existing
> thread pool.
> -- 
> Best regards,
> David Karnok

More information about the jdbc-spec-discuss mailing list