Skip to content

feat: pipeline#3591

Open
cesco69 wants to merge 31 commits intobrianc:masterfrom
cesco69:pipeline-2
Open

feat: pipeline#3591
cesco69 wants to merge 31 commits intobrianc:masterfrom
cesco69:pipeline-2

Conversation

@cesco69
Copy link
Contributor

@cesco69 cesco69 commented Feb 4, 2026

This PR implements pipeline mode, allowing multiple queries to be sent to the PostgreSQL server without waiting for the results of previous queries. This significantly reduces network latency, especially for batch operations or high-latency connections.

Usage

Enabling Pipeline Mode

Pipeline mode is opt-in and can be enabled when creating a client or pool:

// With Client
const client = new Client({ pipelineMode: true })
await client.connect()

// With Pool - all clients will have pipeline mode enabled
const pool = new Pool({ pipelineMode: true })

When pipeline mode is enabled, you can submit multiple queries and they will be sent to the server immediately:

// All three queries are sent immediately to the server
const [result1, result2, result3] = await Promise.all([
  client.query('SELECT 1 as num'),
  client.query('SELECT 2 as num'),
  client.query('SELECT 3 as num'),
])

Backpressure

When submitting a large number of queries in pipeline mode, you may want to limit how many queries are pending at once to prevent memory issues. The client supports configurable backpressure:

const client = new Client({
  pipelineMode: true,
  pipelineMaxQueries: 100, // default is 1000
})

When the number of pending queries reaches pipelineMaxQueries, new queries are queued internally and will be sent once space becomes available. The client emits events to help you monitor backpressure:

client.on('pipelineFull', () => {
  console.log('Pipeline is full, new queries will wait')
})

client.on('pipelineDrain', () => {
  console.log('Pipeline has drained, accepting new queries')
})

You can also check the current pipeline depth:

console.log(client.pendingQueryCount) // number of queries waiting for results

The pipelineDrain event is emitted when the pending query count drops below 75% of pipelineMaxQueries (the "low water mark").

Error Isolation

If one query fails in pipeline mode, other queries continue to execute:

const results = await Promise.allSettled([
  client.query('SELECT 1 as num'),
  client.query('SELECT * FROM nonexistent_table'), // This will fail
  client.query('SELECT 3 as num'),
])

console.log(results[0].status) // 'fulfilled'
console.log(results[1].status) // 'rejected'
console.log(results[2].status) // 'fulfilled' - still succeeds!

Prepared Statements in Pipeline Mode

Prepared statements work in pipeline mode, including concurrent queries with the same statement name:

const results = await Promise.all([
  client.query({ name: 'get-user', text: 'SELECT $1::int as id', values: [1] }),
  client.query({ name: 'get-user', text: 'SELECT $1::int as id', values: [2] }),
  client.query({ name: 'get-user', text: 'SELECT $1::int as id', values: [3] }),
])

Restrictions and Limitations

Note: Multi-statement queries (e.g., SELECT 1; SELECT 2) are rejected by PostgreSQL when using the extended query protocol, which pipeline mode uses. This is a PostgreSQL limitation.

Transaction Behavior

Errors inside a transaction will abort subsequent queries until ROLLBACK. This is standard PostgreSQL behavior. If you send COMMIT on an aborted transaction, PostgreSQL automatically converts it to ROLLBACK:

// In a transaction, if one query fails, subsequent queries also fail
await client.query('BEGIN')
await client.query('INSERT INTO t VALUES (1)')
await client.query('INSERT INTO t VALUES (1)') // fails: duplicate key
await client.query('INSERT INTO t VALUES (2)') // also fails: transaction aborted
await client.query('COMMIT') // returns ROLLBACK command

When to Use Pipeline Mode

Pipeline mode is most beneficial when:

  • You have many independent queries to execute
  • Network latency is high (cloud databases, cross-region connections)
  • You're doing batch operations

Related Issues

Benchmark

I have also updated the benchmark b189ed5

Query Type Standard QPS Pipeline QPS Improvement
param 374 440 +18%
named 380 440 +16%
sequence 1149 1680 +46%
insert 5623 11520 +105%

Raw Output

node bench.js
============================================================
STANDARD MODE BENCHMARK
============================================================
warmup done

param queries: 1868
qps: 374

named queries: 1899
qps: 380

sequence queries: 5745
qps: 1149

insert queries: 28115
qps: 5623


============================================================
PIPELINE MODE BENCHMARK
============================================================
warmup done

param queries: 2200
qps: 440

named queries: 2200
qps: 440

sequence queries: 8400
qps: 1680

insert queries: 57600
qps: 11520


============================================================
COMPARISON: Sequential vs Pipeline (same workload)
============================================================

Running 1000 sequential queries...
Sequential: 181ms (5538 qps)
Running 1000 pipeline queries...
Pipeline:   59ms (16842 qps)

@cesco69 cesco69 marked this pull request as ready for review February 4, 2026 13:48
@stephenh
Copy link

stephenh commented Feb 4, 2026

Ah wow, this is amazing!

@cesco69 just curious, in your example:

// All queries are sent immediately without waiting
const [r1, r2, r3] = await Promise.all([
  client.query('SELECT 1'),
  client.query('SELECT 2'),
  client.query('SELECT 3'),
])

Afaiu if SELECT 2 breaks (invalid SQL query, an UPDATE that fails a constraint), then SELECT 3 is aborted too.

...except that you have a test showing this is not the case (nonexistent_table_xyz). 🤔

I really thought the pg docs said that SELECT 3 as num would blow up, but I guess reading again they say "If any statement encounters an error, the server aborts the current transaction and does not execute any subsequent command in the queue until the next synchronization point".

So I guess if there is no open transaction, everything is fine. 👍

Speaking of transactions, I see you have a test for it, but does the BEGIN cause the current connection to be reserved to the current caller/request? It doesn't seem like it?

Like if I have two in-flight requests, I wouldn't want user1 issuing a BEGIN to then accept pipelined statements from user2.

Or I guess that's just on me to use a Pool, and only BEGIN on a reserved-to-that-user connection ... that makes sense/seems obvious in retrospect.

This is really great! 🎉

@cesco69
Copy link
Contributor Author

cesco69 commented Feb 5, 2026

Ah wow, this is amazing!

@cesco69 just curious, in your example:

// All queries are sent immediately without waiting
const [r1, r2, r3] = await Promise.all([
  client.query('SELECT 1'),
  client.query('SELECT 2'),
  client.query('SELECT 3'),
])

Afaiu if SELECT 2 breaks (invalid SQL query, an UPDATE that fails a constraint), then SELECT 3 is aborted too.

...except that you have a test showing this is not the case (nonexistent_table_xyz). 🤔

I really thought the pg docs said that SELECT 3 as num would blow up, but I guess reading again they say "If any statement encounters an error, the server aborts the current transaction and does not execute any subsequent command in the queue until the next synchronization point".

So I guess if there is no open transaction, everything is fine. 👍

Speaking of transactions, I see you have a test for it, but does the BEGIN cause the current connection to be reserved to the current caller/request? It doesn't seem like it?

Like if I have two in-flight requests, I wouldn't want user1 issuing a BEGIN to then accept pipelined statements from user2.

Or I guess that's just on me to use a Pool, and only BEGIN on a reserved-to-that-user connection ... that makes sense/seems obvious in retrospect.

This is really great! 🎉

Thanks! 🙏

You're right about error isolation, each query gets its own sync message, so errors don't cascade outside of transactions. The postgres docs refer to aborting "the current transaction", but in autocommit mode (no explicit BEGIN), each statement is its own transaction.

And yes, for transactions you'd use pool.connect() to get a dedicated client, same as without pipeline mode. Pipeline mode doesn't change connection ownership semantics, it just optimizes the wire protocol for that connection.

I have add more tests!

@cesco69 cesco69 requested a review from charmander February 5, 2026 09:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

implement pipelining mode allowed by libpq 14 Extended Query: Support Batch Execution

3 participants