Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.vertx.tests.pgclient;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.impl.PgSocketConnection;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.internal.SqlConnectionInternal;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class PreparedStatementReprepareTest extends PgTestBase {

private Vertx vertx;

@Before
public void setup() throws Exception {
super.setup();
vertx = Vertx.vertx();
}

@After
public void tearDown(TestContext ctx) {
vertx.close().onComplete(ctx.asyncAssertSuccess());
}

@Test
public void testReprepareDoesNotMakeInflightNegativeWithEnabledCache(TestContext ctx) {
testReprepareDoesNotMakeInflightNegative(ctx, true);
}

@Test
public void testReprepareDoesNotMakeInflightNegativeWithDisabledCache(TestContext ctx) {
testReprepareDoesNotMakeInflightNegative(ctx, false);
}

private void testReprepareDoesNotMakeInflightNegative(TestContext ctx, boolean cachePreparedStatements) {
Async async = ctx.async();

PgConnectOptions options = new PgConnectOptions(this.options)
.setCachePreparedStatements(cachePreparedStatements);

PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> {
PgSocketConnection socket = (PgSocketConnection) ((SqlConnectionInternal) conn).unwrap();

conn
.preparedQuery("SELECT CONCAT('HELLO ', $1)")
.execute(Tuple.of("WORLD"))
.map(rows -> {
RowSet<Row> result = rows;

ctx.assertEquals(1, result.size());
ctx.assertEquals("HELLO WORLD", result.iterator().next().getString(0));

ctx.assertEquals(
0,
socket.inflight(),
"Inflight count should be zero after reprepare query completion " +
"(cachePreparedStatements=" + cachePreparedStatements + ")"
);

return rows;
})
.eventually(conn::close)
.onComplete(ctx.asyncAssertSuccess(v -> async.complete()));
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public int pipeliningLimit() {
return pipeliningLimit;
}

// Visible for testing
public int inflight() {
Comment thread
tsegismont marked this conversation as resolved.
return inflight;
}

@Override
public TracingPolicy tracingPolicy() {
return connectOptions().getTracingPolicy();
Expand Down Expand Up @@ -336,6 +341,8 @@ private void fireCommandMessage(ChannelHandlerContext chctx, CommandMessage<?, ?
} else {
if (queryCmd.autoCommit() && isIndeterminatePreparedStatementError(cause) && !sendParameterTypes) {
ChannelHandlerContext ctx = socket.channelHandlerContext();
// We need to increment inflight because a new prepare command will be submitted
inflight++;
// We cannot cache this prepared statement because it might be executed with another type
fireCommandMessage(ctx, prepareCommand(queryCmd, handler, false, true));
ctx.flush();
Expand Down