From 4d1db0aacf81af724e523e8e5e495a8e5a5a5963 Mon Sep 17 00:00:00 2001 From: Igor Deo Alves Date: Wed, 6 May 2026 20:43:38 -0300 Subject: [PATCH 1/3] Mark schema template replacements as Stable to enable Replacer cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit schemaTemplateParam (and the schema replacement in ColumnExists) never set Stable: true, so the sqlctemplate.Replacer cache introduced in #802 is bypassed on every query. This causes regexp.ReplaceAllStringFunc to run on every SQL query, allocating a new string each time. Under production throughput this creates enough allocation pressure to cause visible memory growth — we observed +29MB in 16 minutes on a worker running river.Start() with continuous job polling. The schema value comes from the client config and is constant for the lifetime of a River client, so marking it Stable is safe. With this change, the regex runs once per unique SQL string and then serves from cache. Fixes all three drivers: riverpgxv5, riverdatabasesql, riversqlite. Fixes #1241 --- .../river_database_sql_driver.go | 4 +- .../river_database_sql_driver_test.go | 55 ++++++++++++++++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 4 +- .../riverpgxv5/river_pgx_v5_driver_test.go | 57 +++++++++++++++++++ .../riversqlite/river_sqlite_driver.go | 2 +- .../riversqlite/river_sqlite_driver_test.go | 55 ++++++++++++++++++ 6 files changed, 172 insertions(+), 5 deletions(-) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index d8a2d445..16a3ae35 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -126,7 +126,7 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE schema = "'" + params.Schema + "'" } ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) exists, err := dbsqlc.New().ColumnExists(ctx, e.dbtx, &dbsqlc.ColumnExistsParams{ @@ -1247,6 +1247,6 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context { } return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver_test.go b/riverdriver/riverdatabasesql/river_database_sql_driver_test.go index 3097dbbf..0d23bb43 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver_test.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver_test.go @@ -115,4 +115,59 @@ func TestSchemaTemplateParam(t *testing.T) { require.NoError(t, err) require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL) }) + + t.Run("SchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1) + + // Second call with same SQL + same schema produces identical result. + // Because schema is marked Stable, the Replacer caches the output + // after the first call and short-circuits regex on subsequent calls. + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) + + t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1) + + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index e0201a0e..c1fce791 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -134,7 +134,7 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE schema = "'" + params.Schema + "'" } ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) exists, err := dbsqlc.New().ColumnExists(ctx, e.dbtx, &dbsqlc.ColumnExistsParams{ @@ -1310,6 +1310,6 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context { } return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go index f89538f4..ae366d5a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go @@ -238,6 +238,63 @@ func TestSchemaTemplateParam(t *testing.T) { require.NoError(t, err) require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL) }) + + t.Run("SchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + // First call + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1) + + // Second call with same SQL + same schema produces identical result. + // Because schema is marked Stable, the Replacer caches the output + // after the first call and short-circuits regex on subsequent calls. + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) + + t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1) + + // Repeat — same result from cache + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) } type nilConnDBTX struct{} diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 132d0e46..b7d729ae 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1680,7 +1680,7 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context { } return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) } diff --git a/riverdriver/riversqlite/river_sqlite_driver_test.go b/riverdriver/riversqlite/river_sqlite_driver_test.go index fcba8f36..1d445029 100644 --- a/riverdriver/riversqlite/river_sqlite_driver_test.go +++ b/riverdriver/riversqlite/river_sqlite_driver_test.go @@ -94,4 +94,59 @@ func TestSchemaTemplateParam(t *testing.T) { require.NoError(t, err) require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL) }) + + t.Run("SchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1) + + // Second call with same SQL + same schema produces identical result. + // Because schema is marked Stable, the Replacer caches the output + // after the first call and short-circuits regex on subsequent calls. + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) + + t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1) + + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) } From c533ec3a1dc53a6a9240d8668609cc85151bec71 Mon Sep 17 00:00:00 2001 From: Igor Deo Alves Date: Thu, 7 May 2026 23:38:55 -0300 Subject: [PATCH 2/3] Add entry to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fe80d63..6ebf847a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Mark schema replacements as `Stable` in sqlc templates, preventing query SQL from having to be reallocated over and over again.. [PR #1242](https://github.com/riverqueue/river/pull/1242). - Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236). - Fix bug in `sqltemplate` cached path in order in which named args are passed to a query (previously, the order was unstable). [PR #1243](https://github.com/riverqueue/river/pull/1243). From 41e508444df9ef7729888523904fe091c2b9d809 Mon Sep 17 00:00:00 2001 From: Igor Deo Alves Date: Thu, 7 May 2026 23:45:49 -0300 Subject: [PATCH 3/3] revert Stable: true in ColumExists in river_pgx_v5_driver --- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index c1fce791..efbddf1f 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -134,7 +134,7 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE schema = "'" + params.Schema + "'" } ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema, Stable: true}, + "schema": {Value: schema}, }, nil) exists, err := dbsqlc.New().ColumnExists(ctx, e.dbtx, &dbsqlc.ColumnExistsParams{