From 0e91857d2133a890621e34787facfab43be56245 Mon Sep 17 00:00:00 2001 From: Daniel Fiala Date: Tue, 10 Mar 2026 12:19:44 +0100 Subject: [PATCH] Add support for streaming and cursor query execution in SQL templates Motivation: - Introduce a feature to handle large result sets efficiently with streaming and cursor-based query execution. Changes: - Added `SqlTemplateStream` and `forCursor` methods to `SqlTemplate`. - Added `SqlTemplateStreamImpl`, `CursorSqlTemplateImpl`, and `MappingRowStream` - Added documentation to `index.adoc` with examples --- .../src/main/asciidoc/index.adoc | 29 +++ .../main/java/examples/TemplateExamples.java | 51 +++++ .../vertx/sqlclient/templates/RowMapper.java | 20 ++ .../sqlclient/templates/SqlTemplate.java | 43 ++-- .../templates/SqlTemplateStream.java | 115 ++++++++++ .../sqlclient/templates/TupleMapper.java | 21 ++ .../templates/impl/CursorSqlTemplateImpl.java | 67 ++++++ .../templates/impl/MappingRowStream.java | 69 ++++++ .../templates/impl/SqlTemplateImpl.java | 10 +- .../templates/impl/SqlTemplateStreamImpl.java | 63 ++++++ .../sqlclient/templates/MySQLStreamTest.java | 65 ++++++ .../sqlclient/templates/PgStreamTest.java | 74 +++++++ .../sqlclient/templates/StreamTestBase.java | 203 ++++++++++++++++++ 13 files changed, 807 insertions(+), 23 deletions(-) create mode 100644 vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplateStream.java create mode 100644 vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/CursorSqlTemplateImpl.java create mode 100644 vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/MappingRowStream.java create mode 100644 vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateStreamImpl.java create mode 100644 vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/MySQLStreamTest.java create mode 100644 vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/PgStreamTest.java create mode 100644 vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/StreamTestBase.java diff --git a/vertx-sql-client-templates/src/main/asciidoc/index.adoc b/vertx-sql-client-templates/src/main/asciidoc/index.adoc index 9a73ef292e..bef67207cc 100644 --- a/vertx-sql-client-templates/src/main/asciidoc/index.adoc +++ b/vertx-sql-client-templates/src/main/asciidoc/index.adoc @@ -65,6 +65,35 @@ When your template must be executed inside a transaction, you might create a tem ---- ==== +== Streaming + +When dealing with large result sets, you can use {@link io.vertx.sqlclient.templates.SqlTemplateStream} to read rows progressively using a cursor with a configurable fetch size, instead of loading all rows in memory at once. + +NOTE: Streaming requires a {@link io.vertx.sqlclient.SqlConnection}. Some databases (e.g. PostgreSQL) also require an active transaction for cursors. + +[source,$lang] +---- +{@link examples.TemplateExamples#streamExample} +---- + +You can use `mapTo` to map each row emitted by the stream to a custom type: + +[source,$lang] +---- +{@link examples.TemplateExamples#streamWithMapToExample} +---- + +== Cursor + +If you need finer control over row fetching, you can use a cursor-based template with {@link io.vertx.sqlclient.templates.SqlTemplate#forCursor}. This gives you a {@link io.vertx.sqlclient.Cursor} that allows you to read rows in batches. + +NOTE: Cursors require a {@link io.vertx.sqlclient.SqlConnection}. Some databases (e.g. PostgreSQL) also require an active transaction for cursors. + +[source,$lang] +---- +{@link examples.TemplateExamples#cursorExample} +---- + == Template syntax The template syntax uses `#{XXX}` syntax where `XXX` is a valid https://docs.oracle.com/javase/specs/jls/se8/html/jls-3.html#jls-3.8[java identifier] string diff --git a/vertx-sql-client-templates/src/main/java/examples/TemplateExamples.java b/vertx-sql-client-templates/src/main/java/examples/TemplateExamples.java index b0b365af3c..d4002cc51e 100644 --- a/vertx-sql-client-templates/src/main/java/examples/TemplateExamples.java +++ b/vertx-sql-client-templates/src/main/java/examples/TemplateExamples.java @@ -9,6 +9,7 @@ import io.vertx.sqlclient.*; import io.vertx.sqlclient.templates.RowMapper; import io.vertx.sqlclient.templates.SqlTemplate; +import io.vertx.sqlclient.templates.SqlTemplateStream; import io.vertx.sqlclient.templates.TupleMapper; import io.vertx.sqlclient.templates.annotations.Column; import io.vertx.sqlclient.templates.annotations.ParametersMapped; @@ -424,6 +425,56 @@ public Tuple map(Function mapping, int size, UserDataObject par } } + public void streamExample(SqlConnection connection) { + SqlTemplateStream + .forStream(connection, "SELECT * FROM users WHERE age > #{age}", 50) + .execute(Collections.singletonMap("age", 18)) + .onSuccess(stream -> { + stream.handler(row -> { + System.out.println(row.getString("first_name") + " " + row.getString("last_name")); + }); + stream.endHandler(v -> { + System.out.println("End of stream"); + }); + stream.exceptionHandler(err -> { + System.out.println("Error: " + err.getMessage()); + }); + }); + } + + public void streamWithMapToExample(SqlConnection connection) { + SqlTemplateStream + .forStream(connection, "SELECT * FROM users WHERE age > #{age}", 50) + .mapTo(ROW_USER_MAPPER) + .execute(Collections.singletonMap("age", 18)) + .onSuccess(stream -> { + stream.handler(user -> { + System.out.println(user.firstName + " " + user.lastName); + }); + stream.endHandler(v -> { + System.out.println("End of stream"); + }); + }); + } + + public void cursorExample(SqlConnection connection) { + SqlTemplate + .forCursor(connection, "SELECT * FROM users WHERE age > #{age}") + .execute(Collections.singletonMap("age", 18)) + .onSuccess(cursor -> { + cursor.read(100).onSuccess(rows -> { + rows.forEach(row -> { + System.out.println(row.getString("first_name") + " " + row.getString("last_name")); + }); + if (cursor.hasMore()) { + // Read more + } else { + cursor.close(); + } + }); + }); + } + public void templateInTransaction(Pool pool) { SqlTemplate, RowSet> template = SqlTemplate .forQuery(pool, "SELECT * FROM users WHERE id=#{id}") diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/RowMapper.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/RowMapper.java index 9aba70f1ba..6808d5e1fe 100644 --- a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/RowMapper.java +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/RowMapper.java @@ -11,6 +11,7 @@ package io.vertx.sqlclient.templates; import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Row; /** @@ -20,6 +21,25 @@ @FunctionalInterface public interface RowMapper { + /** + * Create a mapper that converts a {@link Row} to an instance of the given {@code type}. + * + *

This feature relies on {@link io.vertx.core.json.JsonObject#mapTo} feature. This likely requires + * to use Jackson databind in the project. + * + * @param type the target class + * @return the mapper + */ + static RowMapper mapper(Class type) { + return row -> { + JsonObject json = new JsonObject(); + for (int i = 0; i < row.size(); i++) { + json.getMap().put(row.getColumnName(i), row.getValue(i)); + } + return json.mapTo(type); + }; + } + /** * Build a {@code T} representation of the given {@code row} * diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplate.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplate.java index b80d02c976..5ca3cef9b5 100644 --- a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplate.java +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplate.java @@ -14,15 +14,11 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; -import io.vertx.core.json.JsonObject; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.SqlClient; -import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.*; import io.vertx.sqlclient.impl.SqlClientInternal; +import io.vertx.sqlclient.templates.impl.CursorSqlTemplateImpl; import io.vertx.sqlclient.templates.impl.SqlTemplateImpl; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -69,6 +65,32 @@ static SqlTemplate, SqlResult> forUpdate(SqlClient cli return new SqlTemplateImpl<>(clientInternal, sqlTemplate, query -> query.collecting(SqlTemplateImpl.NULL_COLLECTOR), sqlTemplate::mapTuple); } + /** + * Create an SQL template for streaming query results consuming map parameters and returning {@link Row}. + * + *

Delegates to {@link SqlTemplateStream#forStream(SqlConnection, String, int)}. + * + * @param client the wrapped SQL connection + * @param template the template query string + * @param fetchSize the cursor fetch size + * @return the template + */ + static SqlTemplateStream, Row> forStream(SqlConnection client, String template, int fetchSize) { + return SqlTemplateStream.forStream(client, template, fetchSize); + } + + /** + * Create an SQL template for cursor-based query execution consuming map parameters and returning a {@link Cursor}. + * + * @param client the wrapped SQL connection + * @param template the template query string + * @return the template + */ + static SqlTemplate, Cursor> forCursor(SqlConnection client, String template) { + SqlClientInternal clientInternal = (SqlClientInternal) client; + io.vertx.sqlclient.templates.impl.SqlTemplate sqlTemplate = io.vertx.sqlclient.templates.impl.SqlTemplate.create(clientInternal, template); + return new CursorSqlTemplateImpl<>(client, sqlTemplate, sqlTemplate::mapTuple); + } /** * @return the computed SQL for this template @@ -99,14 +121,7 @@ static SqlTemplate, SqlResult> forUpdate(SqlClient cli * @return a new template */ default SqlTemplate mapFrom(Class type) { - return mapFrom(TupleMapper.mapper(params -> { - JsonObject jsonObject = JsonObject.mapFrom(params); - Map map = new LinkedHashMap<>(jsonObject.size()); - for (String fieldName : jsonObject.fieldNames()) { - map.put(fieldName, jsonObject.getValue(fieldName)); - } - return map; - })); + return mapFrom(TupleMapper.mapper(type)); } /** diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplateStream.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplateStream.java new file mode 100644 index 0000000000..437892e685 --- /dev/null +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplateStream.java @@ -0,0 +1,115 @@ +package io.vertx.sqlclient.templates; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowStream; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.impl.SqlClientInternal; +import io.vertx.sqlclient.templates.impl.SqlTemplateStreamImpl; + +import java.util.Map; + +/** + * An SQL template for streaming query results. + * + *

Stream templates execute queries using named instead of positional parameters and return results + * as a {@link RowStream} that reads rows progressively using a cursor with a configurable fetch size. + * + * @param the input parameters type + * @param the row output type + */ +@VertxGen +public interface SqlTemplateStream { + + /** + * Create an SQL template for streaming query results consuming map parameters and returning {@link Row}. + * + *

The returned stream template uses a cursor with the given {@code fetchSize} to read rows progressively. + * + * @param client the wrapped SQL connection + * @param template the template query string + * @param fetchSize the cursor fetch size + * @return the template + */ + static SqlTemplateStream, Row> forStream(SqlConnection client, String template, int fetchSize) { + SqlClientInternal clientInternal = (SqlClientInternal) client; + io.vertx.sqlclient.templates.impl.SqlTemplate sqlTemplate = io.vertx.sqlclient.templates.impl.SqlTemplate.create(clientInternal, template); + return new SqlTemplateStreamImpl<>(client, sqlTemplate, sqlTemplate::mapTuple, null, fetchSize); + } + + /** + * @return the computed SQL for this template + */ + String sql(); + + /** + * Set a parameters user defined mapping function. + * + *

At query execution, the {@code mapper} is called to map the parameters object + * to a {@link io.vertx.sqlclient.Tuple} that configures the prepared query. + * + * @param mapper the mapping function + * @return a new template + */ + SqlTemplateStream mapFrom(TupleMapper mapper); + + /** + * Set a parameters user defined class mapping. + * + *

At query execution, the parameters object is mapped to a {@code Map} that + * configures the prepared query. + * + *

This feature relies on {@link io.vertx.core.json.JsonObject#mapFrom} feature. This likely requires + * to use Jackson databind in the project. + * + * @param type the mapping type + * @return a new template + */ + default SqlTemplateStream mapFrom(Class type) { + return mapFrom(TupleMapper.mapper(type)); + } + + /** + * Set a row user defined mapping function. + * + *

When rows are emitted by the stream, the {@code mapper} function is called to map each {@link Row} + * to the target type. + * + * @param mapper the mapping function + * @return a new template + */ + SqlTemplateStream mapTo(RowMapper mapper); + + /** + * Set a row user defined mapping function. + * + *

When rows are emitted by the stream, resulting rows are mapped to {@code type} instances. + * + *

This feature relies on {@link io.vertx.core.json.JsonObject#mapFrom} feature. This likely requires + * to use Jackson databind in the project. + * + * @param type the mapping type + * @return a new template + */ + default SqlTemplateStream mapTo(Class type) { + return mapTo(RowMapper.mapper(type)); + } + + /** + * Returns a new template, using the specified {@code connection}. + * + * @param connection the connection that will execute requests + * @return a new template + */ + SqlTemplateStream withConnection(SqlConnection connection); + + /** + * Execute the query with the {@code parameters} + * + * @param params the query parameters + * @return a future notified with the result + */ + Future> execute(I params); + +} diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/TupleMapper.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/TupleMapper.java index 1091ec6ef8..e72b8fc241 100644 --- a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/TupleMapper.java +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/TupleMapper.java @@ -15,6 +15,7 @@ import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.templates.impl.JsonTuple; +import java.util.LinkedHashMap; import java.util.Map; import java.util.function.Function; @@ -44,6 +45,26 @@ static TupleMapper mapper(Function> fn) { }; } + /** + * Create a mapper that converts a parameters object of the given {@code type} to a map of named parameters. + * + *

This feature relies on {@link io.vertx.core.json.JsonObject#mapFrom} feature. This likely requires + * to use Jackson databind in the project. + * + * @param type the parameters class + * @return the mapper + */ + static TupleMapper mapper(Class type) { + return mapper(params -> { + JsonObject jsonObject = JsonObject.mapFrom(params); + Map map = new LinkedHashMap<>(jsonObject.size()); + for (String fieldName : jsonObject.fieldNames()) { + map.put(fieldName, jsonObject.getValue(fieldName)); + } + return map; + }); + } + /** * Map a {@link JsonObject} to a {@link Tuple}. */ diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/CursorSqlTemplateImpl.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/CursorSqlTemplateImpl.java new file mode 100644 index 0000000000..7a8a41c06d --- /dev/null +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/CursorSqlTemplateImpl.java @@ -0,0 +1,67 @@ +package io.vertx.sqlclient.templates.impl; + +import io.vertx.core.Future; +import io.vertx.sqlclient.*; +import io.vertx.sqlclient.templates.RowMapper; +import io.vertx.sqlclient.templates.TupleMapper; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collector; + +public class CursorSqlTemplateImpl implements io.vertx.sqlclient.templates.SqlTemplate { + + private final SqlConnection connection; + private final SqlTemplate sqlTemplate; + private final Function tupleMapper; + + public CursorSqlTemplateImpl(SqlConnection connection, SqlTemplate sqlTemplate, Function tupleMapper) { + this.connection = connection; + this.sqlTemplate = sqlTemplate; + this.tupleMapper = tupleMapper; + } + + @Override + public String sql() { + return sqlTemplate.getSql(); + } + + @Override + public io.vertx.sqlclient.templates.SqlTemplate mapFrom(TupleMapper mapper) { + return new CursorSqlTemplateImpl<>(connection, sqlTemplate, params -> mapper.map(sqlTemplate, sqlTemplate.numberOfParams(), params)); + } + + @Override + public io.vertx.sqlclient.templates.SqlTemplate> mapTo(RowMapper mapper) { + throw new UnsupportedOperationException("mapTo is not supported on cursor templates, use forStream instead"); + } + + @Override + public io.vertx.sqlclient.templates.SqlTemplate> mapTo(Class type) { + throw new UnsupportedOperationException("mapTo is not supported on cursor templates, use forStream instead"); + } + + @Override + public io.vertx.sqlclient.templates.SqlTemplate> collecting(Collector collector) { + throw new UnsupportedOperationException("collecting is not supported on cursor templates"); + } + + @Override + public io.vertx.sqlclient.templates.SqlTemplate withClient(SqlClient client) { + if (!(client instanceof SqlConnection)) { + throw new IllegalArgumentException("Cursor templates require a SqlConnection"); + } + return new CursorSqlTemplateImpl<>((SqlConnection) client, sqlTemplate, tupleMapper); + } + + @Override + public Future execute(I params) { + Tuple tuple = tupleMapper.apply(params); + return connection.prepare(sqlTemplate.getSql()).map(ps -> ps.cursor(tuple)); + } + + @Override + public Future executeBatch(List batch) { + throw new UnsupportedOperationException("executeBatch is not supported on cursor templates"); + } +} diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/MappingRowStream.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/MappingRowStream.java new file mode 100644 index 0000000000..3ca0faf1b0 --- /dev/null +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/MappingRowStream.java @@ -0,0 +1,69 @@ +package io.vertx.sqlclient.templates.impl; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowStream; +import io.vertx.sqlclient.desc.RowDescriptor; +import io.vertx.sqlclient.templates.RowMapper; + +public class MappingRowStream implements RowStream { + + private final RowStream delegate; + private final RowMapper mapper; + + public MappingRowStream(RowStream delegate, RowMapper mapper) { + this.delegate = delegate; + this.mapper = mapper; + } + + @Override + public RowDescriptor rowDescriptor() { + return delegate.rowDescriptor(); + } + + @Override + public RowStream exceptionHandler(Handler handler) { + delegate.exceptionHandler(handler); + return this; + } + + @Override + public RowStream handler(Handler handler) { + if (handler != null) { + delegate.handler(row -> handler.handle(mapper.map(row))); + } else { + delegate.handler(null); + } + return this; + } + + @Override + public RowStream pause() { + delegate.pause(); + return this; + } + + @Override + public RowStream resume() { + delegate.resume(); + return this; + } + + @Override + public RowStream endHandler(Handler handler) { + delegate.endHandler(handler); + return this; + } + + @Override + public RowStream fetch(long l) { + delegate.fetch(l); + return this; + } + + @Override + public Future close() { + return delegate.close(); + } +} diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateImpl.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateImpl.java index 1a560f829e..8720255b60 100644 --- a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateImpl.java +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateImpl.java @@ -1,7 +1,6 @@ package io.vertx.sqlclient.templates.impl; import io.vertx.core.Future; -import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.*; import io.vertx.sqlclient.impl.SqlClientInternal; import io.vertx.sqlclient.templates.RowMapper; @@ -15,7 +14,6 @@ public class SqlTemplateImpl implements io.vertx.sqlclient.templates.SqlTemplate { - // public static final Collector NULL_COLLECTOR = Collector.of(() -> null, (v, row) -> {}, (a, b) -> null); protected final SqlClientInternal client; @@ -57,13 +55,7 @@ public io.vertx.sqlclient.templates.SqlTemplate withClient(SqlClient clien @Override public io.vertx.sqlclient.templates.SqlTemplate> mapTo(Class type) { - return mapTo(row -> { - JsonObject json = new JsonObject(); - for (int i = 0;i < row.size();i++) { - json.getMap().put(row.getColumnName(i), row.getValue(i)); - } - return json.mapTo(type); - }); + return mapTo(RowMapper.mapper(type)); } @Override diff --git a/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateStreamImpl.java b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateStreamImpl.java new file mode 100644 index 0000000000..79f7fb1300 --- /dev/null +++ b/vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateStreamImpl.java @@ -0,0 +1,63 @@ +package io.vertx.sqlclient.templates.impl; + +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowStream; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.templates.RowMapper; +import io.vertx.sqlclient.templates.SqlTemplateStream; +import io.vertx.sqlclient.templates.TupleMapper; + +import java.util.function.Function; + +public class SqlTemplateStreamImpl implements SqlTemplateStream { + + private final SqlConnection connection; + private final SqlTemplate sqlTemplate; + private final Function tupleMapper; + private final RowMapper rowMapper; + private final int fetchSize; + + public SqlTemplateStreamImpl(SqlConnection connection, SqlTemplate sqlTemplate, Function tupleMapper, RowMapper rowMapper, int fetchSize) { + this.connection = connection; + this.sqlTemplate = sqlTemplate; + this.tupleMapper = tupleMapper; + this.rowMapper = rowMapper; + this.fetchSize = fetchSize; + } + + @Override + public String sql() { + return sqlTemplate.getSql(); + } + + @Override + public SqlTemplateStream mapFrom(TupleMapper mapper) { + return new SqlTemplateStreamImpl<>(connection, sqlTemplate, params -> mapper.map(sqlTemplate, sqlTemplate.numberOfParams(), params), rowMapper, fetchSize); + } + + @Override + public SqlTemplateStream mapTo(RowMapper mapper) { + return new SqlTemplateStreamImpl<>(connection, sqlTemplate, tupleMapper, mapper, fetchSize); + } + + @Override + public SqlTemplateStream withConnection(SqlConnection connection) { + return new SqlTemplateStreamImpl<>(connection, sqlTemplate, tupleMapper, rowMapper, fetchSize); + } + + @SuppressWarnings("unchecked") + @Override + public Future> execute(I params) { + Tuple tuple = tupleMapper.apply(params); + return connection.prepare(sqlTemplate.getSql()).map(ps -> { + RowStream stream = ps.createStream(fetchSize, tuple); + if (rowMapper != null) { + return new MappingRowStream<>(stream, rowMapper); + } else { + return (RowStream) stream; + } + }); + } +} diff --git a/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/MySQLStreamTest.java b/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/MySQLStreamTest.java new file mode 100644 index 0000000000..9fdbc6b5af --- /dev/null +++ b/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/MySQLStreamTest.java @@ -0,0 +1,65 @@ +package io.vertx.tests.sqlclient.templates; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mysqlclient.MySQLConnectOptions; +import io.vertx.mysqlclient.MySQLConnection; +import io.vertx.sqlclient.SqlConnection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; + +public class MySQLStreamTest extends StreamTestBase { + + private static GenericContainer server; + + @BeforeClass + public static void startDatabase() { + server = new GenericContainer<>("mysql:8.0") + .withEnv("MYSQL_USER", "mysql") + .withEnv("MYSQL_PASSWORD", "password") + .withEnv("MYSQL_ROOT_PASSWORD", "password") + .withEnv("MYSQL_DATABASE", "testschema") + .withExposedPorts(3306) + .withReuse(true); + server.start(); + } + + @AfterClass + public static void stopDatabase() { + try { + server.stop(); + } finally { + server = null; + } + } + + private static MySQLConnectOptions connectOptions() { + return new MySQLConnectOptions() + .setPort(server.getMappedPort(3306)) + .setHost(server.getHost()) + .setDatabase("testschema") + .setUser("mysql") + .setPassword("password"); + } + + @Override + protected Future connect(Vertx vertx) { + return MySQLConnection.connect(vertx, connectOptions()).map(conn -> conn); + } + + @Override + protected String createTableSql() { + return "CREATE TABLE test_stream (id INT)"; + } + + @Override + protected String selectSingleRowSql() { + return "SELECT CAST(#{id} AS SIGNED) AS id"; + } + + @Override + protected String selectTwoColumnsSql() { + return "SELECT CAST(#{id} AS SIGNED) AS id, CAST(#{randomnumber} AS SIGNED) AS randomnumber"; + } +} diff --git a/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/PgStreamTest.java b/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/PgStreamTest.java new file mode 100644 index 0000000000..af7fde1af0 --- /dev/null +++ b/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/PgStreamTest.java @@ -0,0 +1,74 @@ +package io.vertx.tests.sqlclient.templates; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgConnection; +import io.vertx.sqlclient.SqlConnection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import static io.vertx.pgclient.PgConnectOptions.DEFAULT_PORT; + +public class PgStreamTest extends StreamTestBase { + + private static GenericContainer server; + + @BeforeClass + public static void startDatabase() { + server = new GenericContainer<>("postgres:10.10") + .withEnv("POSTGRES_DB", "postgres") + .withEnv("POSTGRES_USER", "postgres") + .withEnv("POSTGRES_PASSWORD", "postgres") + .withExposedPorts(DEFAULT_PORT) + .waitingFor(new LogMessageWaitStrategy() + .withRegEx(".*database system is ready to accept connections.*\\s") + .withTimes(2) + .withStartupTimeout(Duration.of(60, ChronoUnit.SECONDS))) + .withCommand("postgres", "-c", "fsync=off"); + server.start(); + } + + @AfterClass + public static void stopDatabase() { + try { + server.stop(); + } finally { + server = null; + } + } + + private static PgConnectOptions connectOptions() { + return new PgConnectOptions() + .setPort(server.getMappedPort(DEFAULT_PORT)) + .setHost(server.getHost()) + .setDatabase("postgres") + .setUser("postgres") + .setPassword("postgres"); + } + + @Override + protected Future connect(Vertx vertx) { + return PgConnection.connect(vertx, connectOptions()).map(conn -> conn); + } + + @Override + protected String createTableSql() { + return "CREATE TABLE test_stream (id INT4)"; + } + + @Override + protected String selectSingleRowSql() { + return "SELECT #{id} :: INT4 \"id\""; + } + + @Override + protected String selectTwoColumnsSql() { + return "SELECT #{id} :: INT4 \"id\", #{randomnumber} :: INT4 \"randomnumber\""; + } +} diff --git a/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/StreamTestBase.java b/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/StreamTestBase.java new file mode 100644 index 0000000000..8b2f38099d --- /dev/null +++ b/vertx-sql-client-templates/src/test/java/io/vertx/tests/sqlclient/templates/StreamTestBase.java @@ -0,0 +1,203 @@ +package io.vertx.tests.sqlclient.templates; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.templates.SqlTemplate; +import io.vertx.sqlclient.templates.SqlTemplateStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(VertxUnitRunner.class) +public abstract class StreamTestBase { + + protected Vertx vertx; + protected SqlConnection connection; + + protected abstract Future connect(Vertx vertx); + + protected abstract String createTableSql(); + + protected abstract String selectSingleRowSql(); + + protected abstract String selectTwoColumnsSql(); + + @Before + public void setup(TestContext ctx) { + vertx = Vertx.vertx(); + Async async = ctx.async(); + connect(vertx).onComplete(ctx.asyncAssertSuccess(conn -> { + connection = conn; + async.complete(); + })); + async.await(10000); + } + + @After + public void teardown(TestContext ctx) { + if (connection != null) { + connection.close(); + } + vertx.close().onComplete(ctx.asyncAssertSuccess()); + } + + @Test + public void testStream(TestContext ctx) { + Async async = ctx.async(); + connection.begin().onComplete(ctx.asyncAssertSuccess(tx -> { + SqlTemplateStream + .forStream(connection, selectSingleRowSql(), 1) + .execute(Collections.singletonMap("id", 1)) + .onComplete(ctx.asyncAssertSuccess(stream -> { + List rows = new ArrayList<>(); + stream.handler(rows::add); + stream.endHandler(v -> { + ctx.assertEquals(1, rows.size()); + ctx.assertEquals(1, rows.get(0).getInteger("id")); + tx.rollback().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())); + }); + stream.exceptionHandler(ctx::fail); + })); + })); + async.await(10000); + } + + @Test + public void testStreamWithMapTo(TestContext ctx) { + Async async = ctx.async(); + connection.begin().onComplete(ctx.asyncAssertSuccess(tx -> { + SqlTemplateStream + .forStream(connection, selectTwoColumnsSql(), 1) + .mapTo(World.class) + .execute(Map.of("id", 1, "randomnumber", 10)) + .onComplete(ctx.asyncAssertSuccess(stream -> { + List worlds = new ArrayList<>(); + stream.handler(worlds::add); + stream.endHandler(v -> { + ctx.assertEquals(1, worlds.size()); + ctx.assertEquals(1, worlds.get(0).id); + ctx.assertEquals(10, worlds.get(0).randomnumber); + tx.rollback().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())); + }); + stream.exceptionHandler(ctx::fail); + })); + })); + async.await(10000); + } + + @Test + public void testStreamWithRowMapper(TestContext ctx) { + Async async = ctx.async(); + connection.begin().onComplete(ctx.asyncAssertSuccess(tx -> { + SqlTemplateStream + .forStream(connection, selectTwoColumnsSql(), 1) + .mapTo(row -> new World(row.getInteger("id"), row.getInteger("randomnumber"))) + .execute(Map.of("id", 1, "randomnumber", 10)) + .onComplete(ctx.asyncAssertSuccess(stream -> { + List worlds = new ArrayList<>(); + stream.handler(worlds::add); + stream.endHandler(v -> { + ctx.assertEquals(1, worlds.size()); + ctx.assertEquals(1, worlds.get(0).id); + ctx.assertEquals(10, worlds.get(0).randomnumber); + tx.rollback().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())); + }); + stream.exceptionHandler(ctx::fail); + })); + })); + async.await(10000); + } + + @Test + public void testStreamMultipleRows(TestContext ctx) { + Async async = ctx.async(); + connection.begin().onComplete(ctx.asyncAssertSuccess(tx -> { + connection.query("DROP TABLE IF EXISTS test_stream").execute().onComplete(ctx.asyncAssertSuccess(d -> { + connection.query(createTableSql()).execute().onComplete(ctx.asyncAssertSuccess(c -> { + connection.query("INSERT INTO test_stream VALUES (1),(2),(3),(4),(5)").execute().onComplete(ctx.asyncAssertSuccess(i -> { + SqlTemplateStream + .forStream(connection, "SELECT * FROM test_stream WHERE id > #{minId}", 2) + .mapTo(row -> row.getInteger("id")) + .execute(Collections.singletonMap("minId", 0)) + .onComplete(ctx.asyncAssertSuccess(stream -> { + List ids = new ArrayList<>(); + stream.handler(ids::add); + stream.endHandler(v -> { + ctx.assertEquals(5, ids.size()); + tx.rollback().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())); + }); + stream.exceptionHandler(ctx::fail); + })); + })); + })); + })); + })); + async.await(10000); + } + + @Test + public void testCursor(TestContext ctx) { + Async async = ctx.async(); + connection.begin().onComplete(ctx.asyncAssertSuccess(tx -> { + SqlTemplate + .forCursor(connection, selectSingleRowSql()) + .execute(Collections.singletonMap("id", 1)) + .onComplete(ctx.asyncAssertSuccess(cursor -> { + cursor.read(10).onComplete(ctx.asyncAssertSuccess(rows -> { + ctx.assertEquals(1, rows.size()); + ctx.assertEquals(1, rows.iterator().next().getInteger("id")); + ctx.assertFalse(cursor.hasMore()); + cursor.close().onComplete(ctx.asyncAssertSuccess(v -> + tx.rollback().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())) + )); + })); + })); + })); + async.await(10000); + } + + @Test + public void testCursorMultipleRows(TestContext ctx) { + Async async = ctx.async(); + connection.begin().onComplete(ctx.asyncAssertSuccess(tx -> { + connection.query("DROP TABLE IF EXISTS test_cursor").execute().onComplete(ctx.asyncAssertSuccess(d -> { + connection.query(createTableSql().replace("test_stream", "test_cursor")).execute().onComplete(ctx.asyncAssertSuccess(c -> { + connection.query("INSERT INTO test_cursor VALUES (1),(2),(3),(4),(5)").execute().onComplete(ctx.asyncAssertSuccess(i -> { + SqlTemplate + .forCursor(connection, "SELECT * FROM test_cursor WHERE id > #{minId}") + .execute(Collections.singletonMap("minId", 0)) + .onComplete(ctx.asyncAssertSuccess(cursor -> { + cursor.read(2).onComplete(ctx.asyncAssertSuccess(rows1 -> { + ctx.assertEquals(2, rows1.size()); + ctx.assertTrue(cursor.hasMore()); + cursor.read(2).onComplete(ctx.asyncAssertSuccess(rows2 -> { + ctx.assertEquals(2, rows2.size()); + ctx.assertTrue(cursor.hasMore()); + cursor.read(2).onComplete(ctx.asyncAssertSuccess(rows3 -> { + ctx.assertEquals(1, rows3.size()); + ctx.assertFalse(cursor.hasMore()); + cursor.close().onComplete(ctx.asyncAssertSuccess(v -> + tx.rollback().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())) + )); + })); + })); + })); + })); + })); + })); + })); + })); + async.await(10000); + } +}