#757 Add support for dynamically loaded JDBC drivers and ensure connections are reused in JDBC Native#760
Conversation
WalkthroughThis PR implements dynamic JDBC driver loading for the JDBC Native source by refactoring core connection-selection infrastructure to support optional driver JAR paths, establishing AutoCloseable resource contracts, and updating all JDBC readers and utilities to route operations through a selector-based pattern that enables driver instantiation and caching at read time. ChangesJDBC Driver Support and Resource Lifecycle
Test Updates and Version Alignment
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala`:
- Line 24: The TableReader trait was changed to extend AutoCloseable but doesn't
provide a default close implementation, breaking downstream implementations; add
a default no-op override def close(): Unit = () in the TableReader trait so
existing external implementations remain binary/source compatible and
resource-owning readers can still override close() as needed, referencing the
TableReader trait and its close() method for where to add this default.
In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Line 57: The AnalysisException pattern in BookkeeperDeltaTable (line with case
ex: AnalysisException) is too broad and can match "table/database does not
exist" errors; narrow the match to only column/field resolution failures (e.g.,
check for "cannot resolve" or a message pattern like "column .* does not exist")
or, better, proactively verify the table exists before calling migrateModel()
using Spark's catalog (spark.catalog.tableExists) to avoid attempting
SaveMode.Append on a missing table; update the match in the exception handler
and/or add a table existence check in init()/migrateModel() so migrateModel()
only runs when the target table is present.
In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 57-60: The current catch in BookkeeperDeltaTable matches
AnalysisException messages containing "does not exist", which is too broad and
can catch missing-table/database errors and lead migrateModel() to attempt
SaveMode.Append on a non-existent table; update the handling in the code that
calls migrateModel()/spark.table(recordsFullTableName) to either (a) narrow the
pattern to column-resolution errors (e.g. match "cannot resolve" or messages
that mention "column ... does not exist") or (b) check
spark.catalog.tableExists(recordsFullTableName) before calling migrateModel(),
and only call migrateModel() when the table exists but the schema indicates
missing columns (use the BookkeeperDeltaTable init/migrateModel() flow
accordingly so you don’t mask genuine missing-table/database errors).
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`:
- Line 63: The catch in BookkeeperDeltaPath currently matches AnalysisException
messages containing the generic phrase "does not exist", which can accidentally
swallow unrelated errors (e.g., missing table or path) before migrateModel()
runs; update the match in the exception handler in BookkeeperDeltaPath (the
AnalysisException branch used around migrateModel/init logic) to only target
schema/column resolution failures (e.g., check ex.getMessage contains "cannot
resolve" or a case-insensitive regex like ".*column.*does not exist.*") or add
an explicit path existence check (call the path existence check used in init()
before performing migrateModel()) so that non-schema errors are not
misclassified and are propagated instead of being masked.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala`:
- Around line 71-83: The loadDriver method in JdbcUrlSelector currently creates
a URLClassLoader and returns only the Driver, leaking the loader; modify
loadDriver to return/encapsulate both the Driver and its URLClassLoader (e.g., a
small Closeable wrapper class like DriverWithClassLoader), update callers to
store that wrapper, and ensure JdbcUrlSelectorImpl.close() calls loader.close()
(via wrapper.close()) in addition to closing the JDBC Connection so the jar
classloader is properly released.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala`:
- Around line 120-125: The cached JDBC connection is replaced without closing
the old one; before assigning connection = newConnection in the branch that
checks connection validity, attempt to close the existing connection in a
best-effort block: if connection != null then try connection.close() (guarding
with !connection.isClosed if desired) and ignore/log any SQLException, then
proceed to assign the new handle from getNewConnection(...) and register it with
ThreadClosableRegistry.registerCloseable(connection); perform the close inside a
try/catch so failures don't prevent creating/assigning the new connection.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`:
- Around line 177-184: getResultSet(jdbcConfig: JdbcConfig, url: String, query:
String, jdbcDriverJarPath: Option[String]) and its overload
getResultSet(connection, jdbcConfig, query) currently return only a ResultSet
which leaks the owning Statement and Connection; change them to return a
closeable resource object (e.g., JdbcResultSetWrapper or similar) that holds the
Connection, Statement and ResultSet and implements AutoCloseable/Closable so its
close() closes rs, stmt, and conn in the correct order; update both getResultSet
overloads to construct and return this wrapper (the first to open the Connection
and pass it into the second), and update all callers to use the new wrapper with
try-with-resources / try-finally so the whole (Connection, Statement, ResultSet)
stack is always closed.
- Around line 121-130: The current for-comprehension scopes the selector-owned
Connection into the Using/cleanup logic (via "connection <- conn"), causing the
cached connection to be closed; change withResultSet so it does NOT put the
selector-owned Connection into the Using scope: obtain the connection reference
directly from jdbcUrlSelector.getConnection (e.g. assign the returned conn to a
plain val) and only wrap/using the Statement and ResultSet (created via
connection.createStatement and executeQuery) so that Statement/ResultSet are
closed but the selector retains ownership and lifetime of the Connection; keep
references to executeQuery and action unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6e38d49c-e309-4210-82c7-f85aae4726a9
📒 Files selected for processing (32)
pramen/api/pom.xmlpramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scalapramen/core/pom.xmlpramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scalapramen/core/src/main/scala/za/co/absa/pramen/core/source/JdbcSource.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaTableSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/JdbcUrlSelectorImplSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcSparkUtilsSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/QueryExecutorJdbcSuite.scalapramen/extras/pom.xmlpramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scalapramen/pom.xml
| import java.time.LocalDate | ||
|
|
||
| trait TableReader { | ||
| trait TableReader extends AutoCloseable { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
repo_root="."
target="pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala"
echo "== File: $target =="
sed -n '1,120p' "$target" | nl -ba
echo
echo "== Find implementations/uses of TableReader in repo =="
rg -n --hidden --no-ignore-vcs "extends\s+TableReader\b|with\s+TableReader\b|TableReader\s*(" pramen -S || true
echo
echo "== Find any existing close() methods on TableReader implementers =="
rg -n --hidden --no-ignore-vcs "\bclass\b|\btrait\b|object\s" pramen/api/src/main/scala/za/co/absa/pramen/api -S || trueRepository: AbsaOSS/pramen
Length of output: 197
🏁 Script executed:
#!/bin/bash
set -euo pipefail
target="pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala"
echo "== Confirm path exists =="
ls -la "$target" || true
echo
echo "== Show TableReader.scala (first 120 lines) =="
if [ -f "$target" ]; then
awk '{printf "%6d\t%s\n", NR, $0}' "$target" | sed -n '1,120p'
else
echo "File not found: $target"
fi
echo
echo "== Locate any other TableReader.scala files (in case path differs) =="
fd -t f -a TableReader.scala . || true
echo
echo "== Find repo usages/implementations of TableReader =="
rg -n --hidden --no-ignore-vcs "\bextends\s+TableReader\b|\bwith\s+TableReader\b|\bTableReader\s*\[" pramen -S || true
echo
echo "== Find any close() implementations in API module (TableReader related) =="
rg -n --hidden --no-ignore-vcs "\bdef\s+close\s*\(" pramen/api/src/main/scala -S || true
echo
echo "== Find close() signatures on TableReader itself =="
rg -n --hidden --no-ignore-vcs "trait\s+TableReader\b|AutoCloseable|override\s+def\s+close\s*\(" "$target" pramen/api/src/main/scala -S || trueRepository: AbsaOSS/pramen
Length of output: 3837
🏁 Script executed:
#!/bin/bash
set -euo pipefail
files=(
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scala
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scala
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scala
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
)
echo "== close() occurrences in TableReader implementations =="
for f in "${files[@]}"; do
echo
echo "--- $f ---"
if [ -f "$f" ]; then
rg -n "\bdef\s+close\s*\(" "$f" || true
else
echo "MISSING: $f"
fi
done
echo
echo "== Show TableReaderSpark / JdbcBase definitions around class/trait header + close if present =="
for f in \
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala \
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
do
echo
echo "--- $f (context) ---"
sed -n '1,120p' "$f" | awk '{printf "%4d\t%s\n", NR, $0}' | sed -n '1,120p'
doneRepository: AbsaOSS/pramen
Length of output: 12719
Adding AutoCloseable to TableReader forces an abstract close() contract downstream (binary/source break)
za.co.absa.pramen.api.TableReader now extends AutoCloseable (line 24) but doesn’t define a default close(), so external implementations that don’t already implement def close(): Unit will fail to compile (and can hit AbstractMethodError at runtime when close() is invoked). Provide a default no-op override def close(): Unit = () in the trait and let resource-owning readers override it.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala` at line
24, The TableReader trait was changed to extend AutoCloseable but doesn't
provide a default close implementation, breaking downstream implementations; add
a default no-op override def close(): Unit = () in the TableReader trait so
existing external implementations remain binary/source compatible and
resource-owning readers can still override close() as needed, referencing the
TableReader trait and its close() method for where to add this default.
| spark.table(recordsFullTableName).as[DataChunk] | ||
| } catch { | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") => | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") => |
There was a problem hiding this comment.
Overly broad error pattern matching.
The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:
- "Table does not exist" (missing table entirely)
- "Database does not exist" (catalog issue)
While init() creates the records table on construction, external deletion or catalog issues could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent table, masking the original error with a confusing secondary error.
Consider a more specific pattern that targets column/field resolution failures:
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") ||
ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>Alternatively, verify the table exists before attempting migration using spark.catalog.tableExists().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`
at line 57, The AnalysisException pattern in BookkeeperDeltaTable (line with
case ex: AnalysisException) is too broad and can match "table/database does not
exist" errors; narrow the match to only column/field resolution failures (e.g.,
check for "cannot resolve" or a message pattern like "column .* does not exist")
or, better, proactively verify the table exists before calling migrateModel()
using Spark's catalog (spark.catalog.tableExists) to avoid attempting
SaveMode.Append on a missing table; update the match in the exception handler
and/or add a table existence check in init()/migrateModel() so migrateModel()
only runs when the target table is present.
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") => | ||
| // Spark 2 and 3 | ||
| migrateModel() | ||
| spark.table(recordsFullTableName).as[DataChunk] |
There was a problem hiding this comment.
Overly broad error pattern matching.
The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:
- "Table does not exist" (missing table entirely)
- "Database does not exist" (catalog issue)
While init() creates the records table on construction, external deletion or catalog issues could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent table, masking the original error with a confusing secondary error.
Consider a more specific pattern that targets column/field resolution failures:
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") ||
ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>Alternatively, verify the table exists before attempting migration using spark.catalog.tableExists().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`
around lines 57 - 60, The current catch in BookkeeperDeltaTable matches
AnalysisException messages containing "does not exist", which is too broad and
can catch missing-table/database errors and lead migrateModel() to attempt
SaveMode.Append on a non-existent table; update the handling in the code that
calls migrateModel()/spark.table(recordsFullTableName) to either (a) narrow the
pattern to column-resolution errors (e.g. match "cannot resolve" or messages
that mention "column ... does not exist") or (b) check
spark.catalog.tableExists(recordsFullTableName) before calling migrateModel(),
and only call migrateModel() when the table exists but the schema indicates
missing columns (use the BookkeeperDeltaTable init/migrateModel() flow
accordingly so you don’t mask genuine missing-table/database errors).
| load() | ||
| } catch { | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") => | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") => |
There was a problem hiding this comment.
Overly broad error pattern matching.
The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:
- "Table does not exist" (missing table entirely)
- "Path does not exist" (filesystem issue)
While init() creates the records path on construction, external deletion or race conditions could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent path, masking the original error with a confusing secondary error.
Consider a more specific pattern that targets column/field resolution failures:
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") ||
ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>Alternatively, verify the path exists before attempting migration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`
at line 63, The catch in BookkeeperDeltaPath currently matches AnalysisException
messages containing the generic phrase "does not exist", which can accidentally
swallow unrelated errors (e.g., missing table or path) before migrateModel()
runs; update the match in the exception handler in BookkeeperDeltaPath (the
AnalysisException branch used around migrateModel/init logic) to only target
schema/column resolution failures (e.g., check ex.getMessage contains "cannot
resolve" or a case-insensitive regex like ".*column.*does not exist.*") or add
an explicit path existence check (call the path existence check used in init()
before performing migrateModel()) so that non-schema errors are not
misclassified and are propagated instead of being masked.
| def loadDriver(driverJarPath: String, driverClassName: String): Driver = { | ||
| val jarFile = new File(driverJarPath) | ||
| val jarURL = jarFile.toURI.toURL | ||
|
|
||
| val loader = new URLClassLoader( | ||
| Array(jarURL), | ||
| this.getClass.getClassLoader | ||
| ) | ||
|
|
||
| // Load driver class | ||
| val driverClass = loader.loadClass(driverClassName) | ||
| val driver = driverClass.getDeclaredConstructor().newInstance().asInstanceOf[Driver] | ||
| driver |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and inspect the relevant Scala sources
rg -n "class JdbcUrlSelector|trait JdbcUrlSelector|object JdbcUrlSelector|JdbcUrlSelectorImpl|loadDriver\\(" -S pramen/core/src/main/scala/za/co/absa/pramen/core/reader
fd -e scala "JdbcUrlSelector.scala" pramen/core/src/main/scala/za/co/absa/pramen/core/reader -a || true
# Print the whole file (should be small enough); if too large, narrow to key sections.
FILE="pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala"
if [ -f "$FILE" ]; then
echo "---- $FILE (first 220 lines) ----"
sed -n '1,220p' "$FILE"
else
echo "File not found: $FILE" >&2
fi
# Search for URLClassLoader and close handling across the repo
rg -n "URLClassLoader" pramen/core/src/main/scala -S
rg -n "close\\(\\)|override def close\\(|AutoCloseable" pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala -S
# See if any other helper wraps/retains the loader somewhere else
rg -n "getDeclaredConstructor\\(\\)\\.newInstance\\(\\)|asInstanceOf\\[Driver\\]" pramen/core/src/main/scala -SRepository: AbsaOSS/pramen
Length of output: 4636
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala"
echo "---- $FILE (first 240 lines) ----"
sed -n '1,240p' "$FILE"
echo
echo "---- References to loadDriver / getLoadedDriver / close() in reader package ----"
rg -n "loadDriver\\(|getLoadedDriver|override def close\\(|def close\\(|URLClassLoader" pramen/core/src/main/scala/za/co/absa/pramen/core/reader -SRepository: AbsaOSS/pramen
Length of output: 8721
Close the URLClassLoader created in JdbcUrlSelector.loadDriver
JdbcUrlSelector.loadDriver creates a new URLClassLoader(...) (lines 71-83) but returns only the Driver, discarding the loader reference. JdbcUrlSelectorImpl.close() closes only the JDBC Connection, so it has no way to close() the classloader/jar. Return/hold a closeable wrapper (driver + loader) and call loader.close() from JdbcUrlSelectorImpl.close().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala`
around lines 71 - 83, The loadDriver method in JdbcUrlSelector currently creates
a URLClassLoader and returns only the Driver, leaking the loader; modify
loadDriver to return/encapsulate both the Driver and its URLClassLoader (e.g., a
small Closeable wrapper class like DriverWithClassLoader), update callers to
store that wrapper, and ensure JdbcUrlSelectorImpl.close() calls loader.close()
(via wrapper.close()) in addition to closing the JDBC Connection so the jar
classloader is properly released.
| if (connection == null || connection.isClosed || !connection.isValid(jdbcConfig.connectionTimeoutSeconds.getOrElse(DEFAULT_CONNECTION_TIMEOUT_SECONDS))) { | ||
| val retries = jdbcConfig.retries.getOrElse(getNumberOfUrls) | ||
| val (newConnection, url) = getNewConnection(retries) | ||
| connection = newConnection | ||
| ThreadClosableRegistry.registerCloseable(connection) | ||
| (connection, url) |
There was a problem hiding this comment.
Close the stale cached connection before replacing it.
When the cached connection fails the validity check, this branch overwrites connection with a new handle without closing the old one first. Reconnect loops will therefore accumulate abandoned DB sessions. Close the previous connection in a best-effort block before assignment.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala`
around lines 120 - 125, The cached JDBC connection is replaced without closing
the old one; before assigning connection = newConnection in the branch that
checks connection validity, attempt to close the existing connection in a
best-effort block: if connection != null then try connection.close() (guarding
with !connection.isClosed if desired) and ignore/log any SQLException, then
proceed to assign the new handle from getNewConnection(...) and register it with
ThreadClosableRegistry.registerCloseable(connection); perform the close inside a
try/catch so failures don't prevent creating/assigning the new connection.
| val retries = jdbcUrlSelector.jdbcConfig.retries.getOrElse(jdbcUrlSelector.getNumberOfUrls) | ||
|
|
||
| try { | ||
| val resultSet = executeQuery(statement, query, retries) | ||
| try { | ||
| action(resultSet) | ||
| } finally { | ||
| resultSet.close() | ||
| } | ||
| } finally { | ||
| statement.close() | ||
| } | ||
| } finally { | ||
| connection.close() | ||
| val (conn, _) = jdbcUrlSelector.getConnection | ||
|
|
||
| for { | ||
| connection <- conn | ||
| statement <- connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) | ||
| resultSet <- executeQuery(statement, query, retries) | ||
| } { | ||
| action(resultSet) |
There was a problem hiding this comment.
Don't close the selector-owned connection in withResultSet().
jdbcUrlSelector.getConnection is the reusable cached connection, but connection <- conn puts it into the UsingUtils scope, so this helper closes it after every call. That defeats the new reuse contract and forces the next operation back onto the reconnect path. Let the selector own the Connection lifetime here and scope only the Statement/ResultSet.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`
around lines 121 - 130, The current for-comprehension scopes the selector-owned
Connection into the Using/cleanup logic (via "connection <- conn"), causing the
cached connection to be closed; change withResultSet so it does NOT put the
selector-owned Connection into the Using scope: obtain the connection reference
directly from jdbcUrlSelector.getConnection (e.g. assign the returned conn to a
plain val) and only wrap/using the Statement and ResultSet (created via
connection.createStatement and executeQuery) so that Statement/ResultSet are
closed but the selector retains ownership and lifetime of the Connection; keep
references to executeQuery and action unchanged.
| private[core] def getResultSet(jdbcConfig: JdbcConfig, | ||
| url: String, | ||
| query: String, | ||
| jdbcDriverJarPath: Option[String]): ResultSet = { | ||
| val driverOpt = jdbcDriverJarPath.map(path => JdbcUrlSelector.loadDriver(path, jdbcConfig.driver)) | ||
| val connection = getJdbcConnection(jdbcConfig, url, driverOpt) | ||
|
|
||
| getResultSet(connection, jdbcConfig, query) |
There was a problem hiding this comment.
Keep Connection and Statement ownership with the returned result set.
The overload at Lines 177-184 opens a fresh JDBC Connection, and the core helper at Lines 187-213 creates a Statement, but both return only a ResultSet. The native iterator/count paths close the ResultSet only, so executor reads leak the connection and all callers leak the statement/open cursor. This needs a closeable wrapper or iterator that owns the full (Connection, Statement, ResultSet) stack.
Also applies to: 187-213
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`
around lines 177 - 184, getResultSet(jdbcConfig: JdbcConfig, url: String, query:
String, jdbcDriverJarPath: Option[String]) and its overload
getResultSet(connection, jdbcConfig, query) currently return only a ResultSet
which leaks the owning Statement and Connection; change them to return a
closeable resource object (e.g., JdbcResultSetWrapper or similar) that holds the
Connection, Statement and ResultSet and implements AutoCloseable/Closable so its
close() closes rs, stmt, and conn in the correct order; update both getResultSet
overloads to construct and return this wrapper (the first to open the Connection
and pass it into the second), and update all callers to use the new wrapper with
try-with-resources / try-finally so the whole (Connection, Statement, ResultSet)
stack is always closed.
Unit Test Coverage
Files
|
Closes #757
Summary by CodeRabbit
Release Notes
New Features
Improvements