Skip to content

#757 Add support for dynamically loaded JDBC drivers and ensure connections are reused in JDBC Native#760

Open
yruslan wants to merge 4 commits into
mainfrom
feature/757-add-support-for-dynamically-loaded-jdbc-drivers
Open

#757 Add support for dynamically loaded JDBC drivers and ensure connections are reused in JDBC Native#760
yruslan wants to merge 4 commits into
mainfrom
feature/757-add-support-for-dynamically-loaded-jdbc-drivers

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented May 26, 2026

Closes #757

Summary by CodeRabbit

Release Notes

  • New Features

    • TableReader and related components now support resource management via AutoCloseable, enabling use with try-with-resources statements.
    • Added support for custom JDBC driver JAR paths, allowing configuration of external database drivers.
  • Improvements

    • Enhanced Delta schema error recovery to recognize additional failure conditions.
    • Refined JDBC connection selection strategy for improved reliability.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 26, 2026

Walkthrough

This PR implements dynamic JDBC driver loading for the JDBC Native source by refactoring core connection-selection infrastructure to support optional driver JAR paths, establishing AutoCloseable resource contracts, and updating all JDBC readers and utilities to route operations through a selector-based pattern that enables driver instantiation and caching at read time.

Changes

JDBC Driver Support and Resource Lifecycle

Layer / File(s) Summary
AutoCloseable trait contracts and resource type updates
pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
TableReader and ResultSetToRowIterator are extended to implement AutoCloseable, establishing explicit lifecycle cleanup contracts.
JdbcUrlSelector redesign with driver support and caching
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala
JdbcUrlSelector trait now extends AutoCloseable, splits retry-parameterized working methods into parameterless getWorkingUrl, cached getConnection, and getNewConnection(retriesLeft). Adds driver-path and driver-loading accessors. Implementation manages connection caching with validity checks and register close handling via ThreadClosableRegistry.
JdbcNativeUtils and JdbcSparkUtils driver-aware refactoring
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala
Public JDBC methods refactored to accept JdbcUrlSelector instead of explicit JdbcConfig+url pairs. getJdbcConnection gains optional Driver parameter; connection logic prefers provided driver (via driver.connect) before falling back to DriverManager. Result-set overloads added for driver JAR loading. withResultSet removes explicit retries parameter.
JDBC reader implementations adopt selector and lifecycle
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
All JDBC readers implement explicit close() methods and route JDBC operations through JdbcUrlSelector. TableReaderJdbcNative introduces DRIVER_JAR_PATH config key and updates factory to read and pass driver jar path to selector. SQL metadata and record counting now use selector-based utilities.
JdbcSource resource management and driver JAR configuration
pramen/core/src/main/scala/za/co/absa/pramen/core/source/JdbcSource.scala
Implements proper reader lifecycle via try/finally blocks in count, data, and incremental operations. Derives optional driver JAR path from config via ConfigUtils and passes to JdbcUrlSelector during reader creation.
Enhanced Delta schema migration error detection
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala, pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala, pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
AnalysisException handling expanded to catch both "cannot resolve" and "does not exist" message patterns in Delta table getBkDf methods, triggering model migration and retry for additional schema validation failures.
Connection selection strategy updates
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
Updates to use getNewConnection(retries) instead of getWorkingConnection(retries), shifting to fresh-connection selection within retry loops across all connection-initialization paths.

Test Updates and Version Alignment

Layer / File(s) Summary
Test suite updates for JdbcUrlSelector API
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/JdbcUrlSelectorImplSuite.scala
Updates test imports and helper methods to use JdbcUrlSelector factory instead of direct implementation instantiation. Removes old retries parameters from withResultSet calls. Test constructors now cast selector result to implementation type.
JDBC utility test updates
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala
Updates JdbcNativeUtilsSuite to construct JdbcUrlSelector and pass selector to record-count and dataframe methods. getConnection test now passes optional driver (None). All data/error tests rewired to selector-based overloads.
JDBC metadata and query executor test updates
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcSparkUtilsSuite.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/QueryExecutorJdbcSuite.scala
JdbcSparkUtilsSuite constructs JdbcUrlSelector and passes to withJdbcMetadata. QueryExecutorJdbcSuite mocks updated to use getNewConnection(retries) API and stub getNewConnection(anyInt()) instead of getWorkingConnection(...).
Driver JAR configuration and test coverage
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scala
Adds reader_jar configuration section with driver.jar.path and JDBC settings. New test case verifies TableReaderJdbcNative correctly parses driver JAR path and derives expected JDBC/formatting/limit configuration.
Test mock reader close() implementations
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scala, pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scala
Implements close() method overrides in test mock readers to satisfy AutoCloseable contract.
Test fixture cleanup and warehouse deletion
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaTableSuite.scala
Refactors cleanUpWarehouse() to use recursive directory deletion helper that handles null listFiles() results and properly deletes nested directories before the parent.
Maven version bumps
pramen/pom.xml, pramen/api/pom.xml, pramen/core/pom.xml, pramen/extras/pom.xml
Updates parent and project versions from 1.13.19-SNAPSHOT to 1.14.0-SNAPSHOT across all build files.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • AbsaOSS/pramen#753: Both PRs modify JDBC driver-loading behavior in JdbcNativeUtils and selector logic; the main PR changes JdbcNativeUtils to prefer an optional preloaded Driver via JdbcUrlSelector, while the retrieved PR adjusts getJdbcConnection/driver discovery.
  • AbsaOSS/pramen#641: Both PRs change the JDBC connection/metadata path in core, especially JdbcNativeUtils and JdbcSparkUtils.withJdbcMetadata, with the main PR's selector/driver refactor overlapping the retrieved PR's autoCommit handling changes.
  • AbsaOSS/pramen#628: Both PRs modify the JDBC-native data path; the main PR refactors JdbcNativeUtils to use JdbcUrlSelector/driver-aware connections, while the retrieved PR extends getJdbcNativeDataFrame and changes ResultSetToRowIterator in the same pipeline.

Suggested reviewers

  • jozefbakus

🐰 A selector hops through JAR paths bright,
With drivers loaded just-in-time,
Close handlers cached and clean,
The freshest connections ever seen!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main changes: adding support for dynamically loaded JDBC drivers and ensuring connection reuse in JDBC Native.
Linked Issues check ✅ Passed The PR fully implements issue #757 requirements: dynamic JDBC driver loading, dedicated class loaders, and connection reuse mechanisms across multiple files.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing dynamic JDBC driver loading and connection reuse; no unrelated modifications detected beyond scope.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/757-add-support-for-dynamically-loaded-jdbc-drivers

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala`:
- Line 24: The TableReader trait was changed to extend AutoCloseable but doesn't
provide a default close implementation, breaking downstream implementations; add
a default no-op override def close(): Unit = () in the TableReader trait so
existing external implementations remain binary/source compatible and
resource-owning readers can still override close() as needed, referencing the
TableReader trait and its close() method for where to add this default.

In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Line 57: The AnalysisException pattern in BookkeeperDeltaTable (line with case
ex: AnalysisException) is too broad and can match "table/database does not
exist" errors; narrow the match to only column/field resolution failures (e.g.,
check for "cannot resolve" or a message pattern like "column .* does not exist")
or, better, proactively verify the table exists before calling migrateModel()
using Spark's catalog (spark.catalog.tableExists) to avoid attempting
SaveMode.Append on a missing table; update the match in the exception handler
and/or add a table existence check in init()/migrateModel() so migrateModel()
only runs when the target table is present.

In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 57-60: The current catch in BookkeeperDeltaTable matches
AnalysisException messages containing "does not exist", which is too broad and
can catch missing-table/database errors and lead migrateModel() to attempt
SaveMode.Append on a non-existent table; update the handling in the code that
calls migrateModel()/spark.table(recordsFullTableName) to either (a) narrow the
pattern to column-resolution errors (e.g. match "cannot resolve" or messages
that mention "column ... does not exist") or (b) check
spark.catalog.tableExists(recordsFullTableName) before calling migrateModel(),
and only call migrateModel() when the table exists but the schema indicates
missing columns (use the BookkeeperDeltaTable init/migrateModel() flow
accordingly so you don’t mask genuine missing-table/database errors).

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`:
- Line 63: The catch in BookkeeperDeltaPath currently matches AnalysisException
messages containing the generic phrase "does not exist", which can accidentally
swallow unrelated errors (e.g., missing table or path) before migrateModel()
runs; update the match in the exception handler in BookkeeperDeltaPath (the
AnalysisException branch used around migrateModel/init logic) to only target
schema/column resolution failures (e.g., check ex.getMessage contains "cannot
resolve" or a case-insensitive regex like ".*column.*does not exist.*") or add
an explicit path existence check (call the path existence check used in init()
before performing migrateModel()) so that non-schema errors are not
misclassified and are propagated instead of being masked.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala`:
- Around line 71-83: The loadDriver method in JdbcUrlSelector currently creates
a URLClassLoader and returns only the Driver, leaking the loader; modify
loadDriver to return/encapsulate both the Driver and its URLClassLoader (e.g., a
small Closeable wrapper class like DriverWithClassLoader), update callers to
store that wrapper, and ensure JdbcUrlSelectorImpl.close() calls loader.close()
(via wrapper.close()) in addition to closing the JDBC Connection so the jar
classloader is properly released.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala`:
- Around line 120-125: The cached JDBC connection is replaced without closing
the old one; before assigning connection = newConnection in the branch that
checks connection validity, attempt to close the existing connection in a
best-effort block: if connection != null then try connection.close() (guarding
with !connection.isClosed if desired) and ignore/log any SQLException, then
proceed to assign the new handle from getNewConnection(...) and register it with
ThreadClosableRegistry.registerCloseable(connection); perform the close inside a
try/catch so failures don't prevent creating/assigning the new connection.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`:
- Around line 177-184: getResultSet(jdbcConfig: JdbcConfig, url: String, query:
String, jdbcDriverJarPath: Option[String]) and its overload
getResultSet(connection, jdbcConfig, query) currently return only a ResultSet
which leaks the owning Statement and Connection; change them to return a
closeable resource object (e.g., JdbcResultSetWrapper or similar) that holds the
Connection, Statement and ResultSet and implements AutoCloseable/Closable so its
close() closes rs, stmt, and conn in the correct order; update both getResultSet
overloads to construct and return this wrapper (the first to open the Connection
and pass it into the second), and update all callers to use the new wrapper with
try-with-resources / try-finally so the whole (Connection, Statement, ResultSet)
stack is always closed.
- Around line 121-130: The current for-comprehension scopes the selector-owned
Connection into the Using/cleanup logic (via "connection <- conn"), causing the
cached connection to be closed; change withResultSet so it does NOT put the
selector-owned Connection into the Using scope: obtain the connection reference
directly from jdbcUrlSelector.getConnection (e.g. assign the returned conn to a
plain val) and only wrap/using the Statement and ResultSet (created via
connection.createStatement and executeQuery) so that Statement/ResultSet are
closed but the selector retains ownership and lifetime of the Connection; keep
references to executeQuery and action unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6e38d49c-e309-4210-82c7-f85aae4726a9

📥 Commits

Reviewing files that changed from the base of the PR and between 1383679 and ed9e9b2.

📒 Files selected for processing (32)
  • pramen/api/pom.xml
  • pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala
  • pramen/core/pom.xml
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/source/JdbcSource.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaTableSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/JdbcUrlSelectorImplSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcSparkUtilsSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/QueryExecutorJdbcSuite.scala
  • pramen/extras/pom.xml
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scala
  • pramen/pom.xml

import java.time.LocalDate

trait TableReader {
trait TableReader extends AutoCloseable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

repo_root="."
target="pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala"

echo "== File: $target =="
sed -n '1,120p' "$target" | nl -ba

echo
echo "== Find implementations/uses of TableReader in repo =="
rg -n --hidden --no-ignore-vcs "extends\s+TableReader\b|with\s+TableReader\b|TableReader\s*(" pramen -S || true

echo
echo "== Find any existing close() methods on TableReader implementers =="
rg -n --hidden --no-ignore-vcs "\bclass\b|\btrait\b|object\s" pramen/api/src/main/scala/za/co/absa/pramen/api -S || true

Repository: AbsaOSS/pramen

Length of output: 197


🏁 Script executed:

#!/bin/bash
set -euo pipefail

target="pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala"

echo "== Confirm path exists =="
ls -la "$target" || true

echo
echo "== Show TableReader.scala (first 120 lines) =="
if [ -f "$target" ]; then
  awk '{printf "%6d\t%s\n", NR, $0}' "$target" | sed -n '1,120p'
else
  echo "File not found: $target"
fi

echo
echo "== Locate any other TableReader.scala files (in case path differs) =="
fd -t f -a TableReader.scala . || true

echo
echo "== Find repo usages/implementations of TableReader =="
rg -n --hidden --no-ignore-vcs "\bextends\s+TableReader\b|\bwith\s+TableReader\b|\bTableReader\s*\[" pramen -S || true

echo
echo "== Find any close() implementations in API module (TableReader related) =="
rg -n --hidden --no-ignore-vcs "\bdef\s+close\s*\(" pramen/api/src/main/scala -S || true

echo
echo "== Find close() signatures on TableReader itself =="
rg -n --hidden --no-ignore-vcs "trait\s+TableReader\b|AutoCloseable|override\s+def\s+close\s*\(" "$target" pramen/api/src/main/scala -S || true

Repository: AbsaOSS/pramen

Length of output: 3837


🏁 Script executed:

#!/bin/bash
set -euo pipefail

files=(
  pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scala
  pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scala
  pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scala
  pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
  pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
)

echo "== close() occurrences in TableReader implementations =="
for f in "${files[@]}"; do
  echo
  echo "--- $f ---"
  if [ -f "$f" ]; then
    rg -n "\bdef\s+close\s*\(" "$f" || true
  else
    echo "MISSING: $f"
  fi
done

echo
echo "== Show TableReaderSpark / JdbcBase definitions around class/trait header + close if present =="
for f in \
  pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala \
  pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
do
  echo
  echo "--- $f (context) ---"
  sed -n '1,120p' "$f" | awk '{printf "%4d\t%s\n", NR, $0}' | sed -n '1,120p'
done

Repository: AbsaOSS/pramen

Length of output: 12719


Adding AutoCloseable to TableReader forces an abstract close() contract downstream (binary/source break)

za.co.absa.pramen.api.TableReader now extends AutoCloseable (line 24) but doesn’t define a default close(), so external implementations that don’t already implement def close(): Unit will fail to compile (and can hit AbstractMethodError at runtime when close() is invoked). Provide a default no-op override def close(): Unit = () in the trait and let resource-owning readers override it.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala` at line
24, The TableReader trait was changed to extend AutoCloseable but doesn't
provide a default close implementation, breaking downstream implementations; add
a default no-op override def close(): Unit = () in the TableReader trait so
existing external implementations remain binary/source compatible and
resource-owning readers can still override close() as needed, referencing the
TableReader trait and its close() method for where to add this default.

spark.table(recordsFullTableName).as[DataChunk]
} catch {
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") =>
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Overly broad error pattern matching.

The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:

  • "Table does not exist" (missing table entirely)
  • "Database does not exist" (catalog issue)

While init() creates the records table on construction, external deletion or catalog issues could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent table, masking the original error with a confusing secondary error.

Consider a more specific pattern that targets column/field resolution failures:

case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || 
                              ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>

Alternatively, verify the table exists before attempting migration using spark.catalog.tableExists().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`
at line 57, The AnalysisException pattern in BookkeeperDeltaTable (line with
case ex: AnalysisException) is too broad and can match "table/database does not
exist" errors; narrow the match to only column/field resolution failures (e.g.,
check for "cannot resolve" or a message pattern like "column .* does not exist")
or, better, proactively verify the table exists before calling migrateModel()
using Spark's catalog (spark.catalog.tableExists) to avoid attempting
SaveMode.Append on a missing table; update the match in the exception handler
and/or add a table existence check in init()/migrateModel() so migrateModel()
only runs when the target table is present.

Comment on lines +57 to +60
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") =>
// Spark 2 and 3
migrateModel()
spark.table(recordsFullTableName).as[DataChunk]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Overly broad error pattern matching.

The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:

  • "Table does not exist" (missing table entirely)
  • "Database does not exist" (catalog issue)

While init() creates the records table on construction, external deletion or catalog issues could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent table, masking the original error with a confusing secondary error.

Consider a more specific pattern that targets column/field resolution failures:

case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || 
                              ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>

Alternatively, verify the table exists before attempting migration using spark.catalog.tableExists().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`
around lines 57 - 60, The current catch in BookkeeperDeltaTable matches
AnalysisException messages containing "does not exist", which is too broad and
can catch missing-table/database errors and lead migrateModel() to attempt
SaveMode.Append on a non-existent table; update the handling in the code that
calls migrateModel()/spark.table(recordsFullTableName) to either (a) narrow the
pattern to column-resolution errors (e.g. match "cannot resolve" or messages
that mention "column ... does not exist") or (b) check
spark.catalog.tableExists(recordsFullTableName) before calling migrateModel(),
and only call migrateModel() when the table exists but the schema indicates
missing columns (use the BookkeeperDeltaTable init/migrateModel() flow
accordingly so you don’t mask genuine missing-table/database errors).

load()
} catch {
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") =>
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Overly broad error pattern matching.

The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:

  • "Table does not exist" (missing table entirely)
  • "Path does not exist" (filesystem issue)

While init() creates the records path on construction, external deletion or race conditions could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent path, masking the original error with a confusing secondary error.

Consider a more specific pattern that targets column/field resolution failures:

case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || 
                              ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>

Alternatively, verify the path exists before attempting migration.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`
at line 63, The catch in BookkeeperDeltaPath currently matches AnalysisException
messages containing the generic phrase "does not exist", which can accidentally
swallow unrelated errors (e.g., missing table or path) before migrateModel()
runs; update the match in the exception handler in BookkeeperDeltaPath (the
AnalysisException branch used around migrateModel/init logic) to only target
schema/column resolution failures (e.g., check ex.getMessage contains "cannot
resolve" or a case-insensitive regex like ".*column.*does not exist.*") or add
an explicit path existence check (call the path existence check used in init()
before performing migrateModel()) so that non-schema errors are not
misclassified and are propagated instead of being masked.

Comment on lines +71 to +83
def loadDriver(driverJarPath: String, driverClassName: String): Driver = {
val jarFile = new File(driverJarPath)
val jarURL = jarFile.toURI.toURL

val loader = new URLClassLoader(
Array(jarURL),
this.getClass.getClassLoader
)

// Load driver class
val driverClass = loader.loadClass(driverClassName)
val driver = driverClass.getDeclaredConstructor().newInstance().asInstanceOf[Driver]
driver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate and inspect the relevant Scala sources
rg -n "class JdbcUrlSelector|trait JdbcUrlSelector|object JdbcUrlSelector|JdbcUrlSelectorImpl|loadDriver\\(" -S pramen/core/src/main/scala/za/co/absa/pramen/core/reader
fd -e scala "JdbcUrlSelector.scala" pramen/core/src/main/scala/za/co/absa/pramen/core/reader -a || true

# Print the whole file (should be small enough); if too large, narrow to key sections.
FILE="pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala"
if [ -f "$FILE" ]; then
  echo "---- $FILE (first 220 lines) ----"
  sed -n '1,220p' "$FILE"
else
  echo "File not found: $FILE" >&2
fi

# Search for URLClassLoader and close handling across the repo
rg -n "URLClassLoader" pramen/core/src/main/scala -S
rg -n "close\\(\\)|override def close\\(|AutoCloseable" pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala -S

# See if any other helper wraps/retains the loader somewhere else
rg -n "getDeclaredConstructor\\(\\)\\.newInstance\\(\\)|asInstanceOf\\[Driver\\]" pramen/core/src/main/scala -S

Repository: AbsaOSS/pramen

Length of output: 4636


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala"
echo "---- $FILE (first 240 lines) ----"
sed -n '1,240p' "$FILE"

echo
echo "---- References to loadDriver / getLoadedDriver / close() in reader package ----"
rg -n "loadDriver\\(|getLoadedDriver|override def close\\(|def close\\(|URLClassLoader" pramen/core/src/main/scala/za/co/absa/pramen/core/reader -S

Repository: AbsaOSS/pramen

Length of output: 8721


Close the URLClassLoader created in JdbcUrlSelector.loadDriver
JdbcUrlSelector.loadDriver creates a new URLClassLoader(...) (lines 71-83) but returns only the Driver, discarding the loader reference. JdbcUrlSelectorImpl.close() closes only the JDBC Connection, so it has no way to close() the classloader/jar. Return/hold a closeable wrapper (driver + loader) and call loader.close() from JdbcUrlSelectorImpl.close().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala`
around lines 71 - 83, The loadDriver method in JdbcUrlSelector currently creates
a URLClassLoader and returns only the Driver, leaking the loader; modify
loadDriver to return/encapsulate both the Driver and its URLClassLoader (e.g., a
small Closeable wrapper class like DriverWithClassLoader), update callers to
store that wrapper, and ensure JdbcUrlSelectorImpl.close() calls loader.close()
(via wrapper.close()) in addition to closing the JDBC Connection so the jar
classloader is properly released.

Comment on lines +120 to +125
if (connection == null || connection.isClosed || !connection.isValid(jdbcConfig.connectionTimeoutSeconds.getOrElse(DEFAULT_CONNECTION_TIMEOUT_SECONDS))) {
val retries = jdbcConfig.retries.getOrElse(getNumberOfUrls)
val (newConnection, url) = getNewConnection(retries)
connection = newConnection
ThreadClosableRegistry.registerCloseable(connection)
(connection, url)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Close the stale cached connection before replacing it.

When the cached connection fails the validity check, this branch overwrites connection with a new handle without closing the old one first. Reconnect loops will therefore accumulate abandoned DB sessions. Close the previous connection in a best-effort block before assignment.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala`
around lines 120 - 125, The cached JDBC connection is replaced without closing
the old one; before assigning connection = newConnection in the branch that
checks connection validity, attempt to close the existing connection in a
best-effort block: if connection != null then try connection.close() (guarding
with !connection.isClosed if desired) and ignore/log any SQLException, then
proceed to assign the new handle from getNewConnection(...) and register it with
ThreadClosableRegistry.registerCloseable(connection); perform the close inside a
try/catch so failures don't prevent creating/assigning the new connection.

Comment on lines +121 to +130
val retries = jdbcUrlSelector.jdbcConfig.retries.getOrElse(jdbcUrlSelector.getNumberOfUrls)

try {
val resultSet = executeQuery(statement, query, retries)
try {
action(resultSet)
} finally {
resultSet.close()
}
} finally {
statement.close()
}
} finally {
connection.close()
val (conn, _) = jdbcUrlSelector.getConnection

for {
connection <- conn
statement <- connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
resultSet <- executeQuery(statement, query, retries)
} {
action(resultSet)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't close the selector-owned connection in withResultSet().

jdbcUrlSelector.getConnection is the reusable cached connection, but connection <- conn puts it into the UsingUtils scope, so this helper closes it after every call. That defeats the new reuse contract and forces the next operation back onto the reconnect path. Let the selector own the Connection lifetime here and scope only the Statement/ResultSet.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`
around lines 121 - 130, The current for-comprehension scopes the selector-owned
Connection into the Using/cleanup logic (via "connection <- conn"), causing the
cached connection to be closed; change withResultSet so it does NOT put the
selector-owned Connection into the Using scope: obtain the connection reference
directly from jdbcUrlSelector.getConnection (e.g. assign the returned conn to a
plain val) and only wrap/using the Statement and ResultSet (created via
connection.createStatement and executeQuery) so that Statement/ResultSet are
closed but the selector retains ownership and lifetime of the Connection; keep
references to executeQuery and action unchanged.

Comment on lines +177 to +184
private[core] def getResultSet(jdbcConfig: JdbcConfig,
url: String,
query: String,
jdbcDriverJarPath: Option[String]): ResultSet = {
val driverOpt = jdbcDriverJarPath.map(path => JdbcUrlSelector.loadDriver(path, jdbcConfig.driver))
val connection = getJdbcConnection(jdbcConfig, url, driverOpt)

getResultSet(connection, jdbcConfig, query)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Keep Connection and Statement ownership with the returned result set.

The overload at Lines 177-184 opens a fresh JDBC Connection, and the core helper at Lines 187-213 creates a Statement, but both return only a ResultSet. The native iterator/count paths close the ResultSet only, so executor reads leak the connection and all callers leak the statement/open cursor. This needs a closeable wrapper or iterator that owns the full (Connection, Statement, ResultSet) stack.

Also applies to: 187-213

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`
around lines 177 - 184, getResultSet(jdbcConfig: JdbcConfig, url: String, query:
String, jdbcDriverJarPath: Option[String]) and its overload
getResultSet(connection, jdbcConfig, query) currently return only a ResultSet
which leaks the owning Statement and Connection; change them to return a
closeable resource object (e.g., JdbcResultSetWrapper or similar) that holds the
Connection, Statement and ResultSet and implements AutoCloseable/Closable so its
close() closes rs, stmt, and conn in the correct order; update both getResultSet
overloads to construct and return this wrapper (the first to open the Connection
and pass it into the second), and update all callers to use the new wrapper with
try-with-resources / try-finally so the whole (Connection, Statement, ResultSet)
stack is always closed.

@github-actions
Copy link
Copy Markdown

Unit Test Coverage

Overall Project 76.98% -0.57% 🍏
Files changed 49.3%

Module Coverage
pramen:core Jacoco Report 77.98% -0.62%
Files
Module File Coverage
pramen:core Jacoco Report JdbcSource.scala 100% 🍏
JdbcUrlSelector.scala 100% 🍏
TableReaderJdbcBase.scala 100% -7.75%
BookkeeperDeltaPath.scala 97.36% -0.16% 🍏
JdbcSparkUtils.scala 91.7% -1.04%
BookkeeperDeltaTable.scala 88.95% -0.14% 🍏
QueryExecutorJdbc.scala 86.05% -0.95% 🍏
RdbJdbc.scala 84.65% -2.63%
TableReaderJdbc.scala 82.79% 🍏
ResultSetToRowIterator.scala 78.84% -21.45%
TableReaderJdbcNative.scala 74.79% -3.44%
JdbcUrlSelectorImpl.scala 73.91% -8.56%
TableReaderSpark.scala 70.58% -0.08%
JdbcNativeUtils.scala 70.58% -22.63%
PramenDb.scala 41.38% -0.43% 🍏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for dynamically loading JDBC drivers for the JDBC Native source

1 participant