#692 Refactor the usage of bookeeping database with Slick framework so it can be DB engine agnostic#709
#692 Refactor the usage of bookeeping database with Slick framework so it can be DB engine agnostic#709
Conversation
…anager is no longer there.
WalkthroughRefactors JDBC bookkeeping and related components to be profile-parameterized: replaces hardcoded Postgres table singletons with JdbcProfile-bound table traits and per-profile SlickUtils; updates constructors and factories to accept PramenDb/slickProfile and migrates table access to wrapper instances; tests updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant App as PramenApp
participant PramenDb as PramenDb
participant Component as Bookkeeper/Offset/TokenLock/Journal
participant DB as JDBC_DB
App->>PramenDb: instantiate(jdbcConfig, slickDb, slickProfile)
PramenDb->>Component: create Component(fromPramenDb / new Component(pramenDb, ...))
Component->>Component: init SlickUtils(slickProfile) & TableWrapper(slickProfile)
Component->>DB: slickUtils.ensureDbConnected(slickDb)
Component->>DB: executeQuery / executeAction via profile.api._
DB-->>Component: query results
Component-->>App: return results/state
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala (1)
76-81:⚠️ Potential issue | 🟡 MinorPostgreSQL-specific exception handler inconsistent with multi-database support.
Line 78 catches
org.postgresql.util.PSQLException, introducing a PostgreSQL-specific dependency in code designed to support multiple JDBC databases (PostgreSQL, MySQL, SQLite, JTDS). While it won't cause runtime failures on other databases—the pattern simply won't match and execution falls through to the generic fallback on line 80—it creates an architectural inconsistency and requires the PostgreSQL driver on the classpath even when using only other databases.The fallback on line 80 (
ex.getMessage.contains("constraint")) provides coverage for other databases, but relies on message string matching rather than a proper exception type. Consider consolidating to a database-agnostic exception handling strategy, or document why the PostgreSQL dependency is retained.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala` around lines 76 - 81, The code currently matches org.postgresql.util.PSQLException (introducing a Postgres-only dependency); instead remove that specific case and handle database-agnostic constraint violations by matching java.sql.SQLException and checking SQLState (SQLState starting with "23" indicates integrity constraint violation) or falling back to the existing message-contains("constraint") check; update the match in TokenLockJdbc (replace the case Failure(_: org.postgresql.util.PSQLException) with case Failure(ex: SQLException) if ex.getSQLState != null && ex.getSQLState.startsWith("23") => tryAcquireExistingTicket()) so no PostgreSQL driver class is referenced while preserving behavior.pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)
138-149:⚠️ Potential issue | 🟠 Major
addColumnuses hardcoded double-quoted identifiers incompatible with MySQL and SQL Server.The
addColumnmethod manually constructs SQL with double-quoted identifiers ("table","column"), but the recent refactor (#682) added support for MySQL and SQL Server viagetProfile(). These databases require different quoting:
- MySQL: backticks (
`column`) by default; double quotes work only withANSI_QUOTESmode enabled- SQL Server: square brackets (
[column]) by default; double quotes work only withSET QUOTED_IDENTIFIER ONSince
initTable()correctly uses Slick's profile-aware schema generation (schema.createIfNotExists),addColumn()should similarly leverage the profile's quoting mechanism instead of manualsqluinterpolation.This will prevent database initialization on MySQL and SQL Server during schema migrations (lines 96–122).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala` around lines 138 - 149, The addColumn method builds SQL with hardcoded double quotes which breaks MySQL/SQL Server; update addColumn to use the configured Slick profile's identifier quoting instead of literal " characters: obtain the active profile via getProfile(), use its quoting helper (or implement a small helper that matches getProfile() to return the correct quote chars — backtick for MySQL, square brackets for SQL Server, double quotes for Postgres/others) to wrap table and column names, then build and run the ALTER TABLE statement with db.run(sqlu"...") using those properly quoted identifiers (keep references to addColumn, getProfile(), db.run and sqlu). Ensure the thrown RuntimeException remains but the SQL uses profile-aware quoting.
🧹 Nitpick comments (8)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (1)
104-125:commitRerunperforms update-then-delete without a transaction boundary.The update (lines 112-117) and the subsequent delete of previous batches (lines 120-124) are executed as separate
db.runcalls. If the delete fails after the update succeeds, stale offset records for previous batches remain. This is pre-existing behavior and not introduced by this PR, but given that the method is meant to atomically rewrite offsets for a rerun, wrapping both operations in a single.transactionallyblock would be more robust — especially now that multiple DB engines are in scope.♻️ Suggested transactional wrapping
slickUtils.ensureDbConnected(db) db.run( - offsetTable.records + (offsetTable.records .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli) .map(r => (r.minOffset, r.maxOffset, r.committedAt)) - .update((minOffset.valueString, maxOffset.valueString, Some(committedAt))) - ).execute() - - // Cleaning up previous batches - db.run( - offsetTable.records + .update((minOffset.valueString, maxOffset.valueString, Some(committedAt))) + ).andThen( + offsetTable.records .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt =!= request.createdAt.toEpochMilli) .delete + ).transactionally ).execute()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala` around lines 104 - 125, commitRerun currently issues two separate db.run calls (an update then a delete) which can leave stale records if the delete fails; change it to run both operations in a single transactional DBIO by composing the two queries (the update on offsetTable.records filtered by request.tableName/infoDate/createdAt and the delete on offsetTable.records filtered by request.tableName/infoDate with createdAt =!= request.createdAt.toEpochMilli) into a DBIO.sequence or DBIO.seq and call .transactionally before executing, remove the separate .execute() calls and run the single transactional DBIO so both update and cleanup succeed or roll back together.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala (1)
29-30: Same cross-DB portability note forminOffsetandmaxOffsetas withMetadataTable.value.The
minOffsetandmaxOffsetstring columns have noO.Lengthconstraint. On some DB engines (e.g., H2, DB2), this may default to a shortVARCHARthat could truncate large offset strings. Consider adding an explicit length orO.SqlType("TEXT")for consistent cross-engine behavior, especially given this PR's goal of DB-engine agnosticism.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala` around lines 29 - 30, The minOffset and maxOffset column definitions currently lack length/type constraints which can lead to truncation on some DB engines; update the declarations for def minOffset and def maxOffset to include an explicit size or text type (e.g., add O.Length(x) or O.SqlType("TEXT") in the column(...) call) so the columns are created with a sufficient/portable type across databases.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala (1)
29-29: Optional: Consider addingO.LengthorO.SqlTypeto thevaluecolumn for clarity and consistency.The
valuecolumn at line 29 lacks an explicit length constraint. While this is consistent with other variable-content columns in the codebase (e.g.,schemaJsonin SchemaTable,minOffset/maxOffsetin OffsetTable), adding an explicit constraint would clarify intent and improve cross-DB portability documentation. Without a constraint, Slick defers to the database's default mapping, which varies by engine.Consider adding
O.SqlType("TEXT")orO.Lengthif metadata values have known size bounds, or document the decision to allow database-specific defaults if unbounded values are intentional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala` at line 29, The MetadataTable's value column is declared as def value = column[String]("value") without any explicit size or SQL type, which can lead to inconsistent DB defaults; update the declaration for value in MetadataTable to include an explicit column option such as O.SqlType("TEXT") or an appropriate O.Length(...) (or document why leaving it unbounded is intentional) so database mapping is explicit and portable across engines.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)
36-55: Clean refactoring to parameterized profile.The constructor change and the pattern of creating per-profile table wrappers is consistent and correct. The
import slickProfile.api._brings in the right path-dependent Slick DSL for the given profile.One observation: the same table wrapper instantiation pattern (anonymous
new BookkeepingTable { override val profile = slickProfile }) is also done inPramenDb.scala. WhenBookkeeperJdbcis created via thefromJdbcConfigcompanion method (line 361), it creates its own wrappers, which is fine. But when constructed fromBookkeeper.fromConfig(which passesdbOpt.get.slickDb), the caller already has aPramenDbinstance with the same wrappers. This duplication is acceptable for isolation, but if it becomes a maintenance concern, you could consider accepting the table wrappers directly or reusing thePramenDbinstance.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala` around lines 36 - 55, The ctor of BookkeeperJdbc currently constructs new per-profile table wrappers (BookkeepingTable, SchemaTable, OffsetTable, MetadataTable) which duplicates wrappers already present in PramenDb; refactor to allow reusing them by adding an overloaded BookkeeperJdbc constructor or additional optional parameters that accept the table wrapper instances or a PramenDb reference, update the factory methods Bookkeeper.fromConfig and the companion fromJdbcConfig to pass through the existing PramenDb's wrappers (or the PramenDb itself) when available, and ensure the existing default ctor still constructs wrappers for callers that don't provide them so behavior remains backward-compatible.pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala (2)
42-46: Inconsistent column naming convention (pre-existing).Columns
pipelineId,pipelineName, andenvironmentName(lines 42-44) use camelCase while other columns usesnake_case(e.g.,job_name,watcher_table_name,started_at). This is inherited from the existing schema migrations inPramenDb.scala(lines 97-99), so not introduced by this PR, but worth noting for future cleanup.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala` around lines 42 - 46, The column names pipelineId, pipelineName, and environmentName in JournalTable use camelCase while the rest use snake_case; rename these column definitions to pipeline_id, pipeline_name, and environment_name (preserving types and O.Length) and update all usages (queries, mappings, case class field references) that reference JournalTable.pipelineId, pipelineName, and environmentName so they point to the new identifiers; ensure the names match the existing DB migrations in PramenDb (so the DB column names remain consistent) and run/verify compilation and tests to catch any remaining references.
48-52: 22-field tuple projection is at the Scala 2 limit.The
*projection maps to a 22-element tuple, which is the maximum arity fortupled/unapplyin Scala 2. Adding another column will require switching to Slick'sHList-based mapping or a nested tuple approach. This is a pre-existing constraint, not introduced by this PR.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala` around lines 48 - 52, The current def * projection in JournalTable returns a 22-field tuple and relies on JournalTask.tupled/JournalTask.unapply which hits Scala 2's 22-arity limit; change the mapping to avoid a single 22-tuple by either (a) splitting the columns into two (or more) nested tuples and provide a custom mapping function that converts the nested tuple(s) to/from JournalTask, or (b) switch to an HList-based mapping (Slick shapeless support) for >22 fields; update def * to use the new nested-tuple or HList mapping and remove the direct JournalTask.tupled/unapply usage so the projection no longer exceeds the 22-arity limit.pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)
45-62: Table wrappers inPramenDbare also duplicated in consumer classes.
PramenDbcreates public table wrappers (bookkeepingTable,offsetTable,journalTable,lockTicketTable,metadataTable) that are also independently created inBookkeeperJdbc,TokenLockJdbc, etc. This means two sets ofTableQueryinstances exist for the same tables when usingPramenDb-originated connections. While functionally harmless, you could consider passing thePramenDbinstance (or its wrappers) directly to consumer classes to avoid the duplication.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala` around lines 45 - 62, PramenDb currently instantiates public table wrappers (bookkeepingTable, offsetTable, journalTable, lockTicketTable, metadataTable) while consumer classes recreate their own TableQuery instances — change consumer constructors (e.g., BookkeeperJdbc, TokenLockJdbc, any classes creating duplicate tables) to accept either the PramenDb instance or explicit table wrapper instances (BookkeepingTable, OffsetTable, JournalTable, LockTicketTable, MetadataTable) and use those passed-in wrappers instead of constructing new ones; update places that construct these consumers to pass pramenDb.bookkeepingTable / pramenDb.lockTicketTable etc., and remove the duplicate TableQuery creation from the consumer classes so only PramenDb provides the shared table wrappers.pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala (1)
216-216: Inconsistent accessor:pramenDb.dbvspramenDb.slickDb.All other test files in this PR use
pramenDb.slickDb(e.g.,OffsetManagerJdbcSuite,JournalJdbcSuite), but this fixture usespramenDb.db. While they resolve to the sameDatabaseinstance, using a consistent accessor across the codebase improves readability and avoids confusion about whether they differ.Consider replacing all occurrences of
pramenDb.dbin this file withpramenDb.slickDbto match the rest of the PR.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala` at line 216, The fixture uses pramenDb.db when creating the OffsetManagerJdbc instance (val om = new OffsetManagerJdbc(pramenDb.db, pramenDb.slickProfile, 123L); see OffsetManagerJdbc and om), which is inconsistent with other tests that use pramenDb.slickDb; change the accessor from pramenDb.db to pramenDb.slickDb throughout this fixture so the Database reference is accessed the same way as in OffsetManagerJdbcSuite and JournalJdbcSuite.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala`:
- Around line 76-81: The code currently matches
org.postgresql.util.PSQLException (introducing a Postgres-only dependency);
instead remove that specific case and handle database-agnostic constraint
violations by matching java.sql.SQLException and checking SQLState (SQLState
starting with "23" indicates integrity constraint violation) or falling back to
the existing message-contains("constraint") check; update the match in
TokenLockJdbc (replace the case Failure(_: org.postgresql.util.PSQLException)
with case Failure(ex: SQLException) if ex.getSQLState != null &&
ex.getSQLState.startsWith("23") => tryAcquireExistingTicket()) so no PostgreSQL
driver class is referenced while preserving behavior.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 138-149: The addColumn method builds SQL with hardcoded double
quotes which breaks MySQL/SQL Server; update addColumn to use the configured
Slick profile's identifier quoting instead of literal " characters: obtain the
active profile via getProfile(), use its quoting helper (or implement a small
helper that matches getProfile() to return the correct quote chars — backtick
for MySQL, square brackets for SQL Server, double quotes for Postgres/others) to
wrap table and column names, then build and run the ALTER TABLE statement with
db.run(sqlu"...") using those properly quoted identifiers (keep references to
addColumn, getProfile(), db.run and sqlu). Ensure the thrown RuntimeException
remains but the SQL uses profile-aware quoting.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 36-55: The ctor of BookkeeperJdbc currently constructs new
per-profile table wrappers (BookkeepingTable, SchemaTable, OffsetTable,
MetadataTable) which duplicates wrappers already present in PramenDb; refactor
to allow reusing them by adding an overloaded BookkeeperJdbc constructor or
additional optional parameters that accept the table wrapper instances or a
PramenDb reference, update the factory methods Bookkeeper.fromConfig and the
companion fromJdbcConfig to pass through the existing PramenDb's wrappers (or
the PramenDb itself) when available, and ensure the existing default ctor still
constructs wrappers for callers that don't provide them so behavior remains
backward-compatible.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala`:
- Line 29: The MetadataTable's value column is declared as def value =
column[String]("value") without any explicit size or SQL type, which can lead to
inconsistent DB defaults; update the declaration for value in MetadataTable to
include an explicit column option such as O.SqlType("TEXT") or an appropriate
O.Length(...) (or document why leaving it unbounded is intentional) so database
mapping is explicit and portable across engines.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala`:
- Around line 29-30: The minOffset and maxOffset column definitions currently
lack length/type constraints which can lead to truncation on some DB engines;
update the declarations for def minOffset and def maxOffset to include an
explicit size or text type (e.g., add O.Length(x) or O.SqlType("TEXT") in the
column(...) call) so the columns are created with a sufficient/portable type
across databases.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala`:
- Around line 104-125: commitRerun currently issues two separate db.run calls
(an update then a delete) which can leave stale records if the delete fails;
change it to run both operations in a single transactional DBIO by composing the
two queries (the update on offsetTable.records filtered by
request.tableName/infoDate/createdAt and the delete on offsetTable.records
filtered by request.tableName/infoDate with createdAt =!=
request.createdAt.toEpochMilli) into a DBIO.sequence or DBIO.seq and call
.transactionally before executing, remove the separate .execute() calls and run
the single transactional DBIO so both update and cleanup succeed or roll back
together.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala`:
- Around line 42-46: The column names pipelineId, pipelineName, and
environmentName in JournalTable use camelCase while the rest use snake_case;
rename these column definitions to pipeline_id, pipeline_name, and
environment_name (preserving types and O.Length) and update all usages (queries,
mappings, case class field references) that reference JournalTable.pipelineId,
pipelineName, and environmentName so they point to the new identifiers; ensure
the names match the existing DB migrations in PramenDb (so the DB column names
remain consistent) and run/verify compilation and tests to catch any remaining
references.
- Around line 48-52: The current def * projection in JournalTable returns a
22-field tuple and relies on JournalTask.tupled/JournalTask.unapply which hits
Scala 2's 22-arity limit; change the mapping to avoid a single 22-tuple by
either (a) splitting the columns into two (or more) nested tuples and provide a
custom mapping function that converts the nested tuple(s) to/from JournalTask,
or (b) switch to an HList-based mapping (Slick shapeless support) for >22
fields; update def * to use the new nested-tuple or HList mapping and remove the
direct JournalTask.tupled/unapply usage so the projection no longer exceeds the
22-arity limit.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 45-62: PramenDb currently instantiates public table wrappers
(bookkeepingTable, offsetTable, journalTable, lockTicketTable, metadataTable)
while consumer classes recreate their own TableQuery instances — change consumer
constructors (e.g., BookkeeperJdbc, TokenLockJdbc, any classes creating
duplicate tables) to accept either the PramenDb instance or explicit table
wrapper instances (BookkeepingTable, OffsetTable, JournalTable, LockTicketTable,
MetadataTable) and use those passed-in wrappers instead of constructing new
ones; update places that construct these consumers to pass
pramenDb.bookkeepingTable / pramenDb.lockTicketTable etc., and remove the
duplicate TableQuery creation from the consumer classes so only PramenDb
provides the shared table wrappers.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala`:
- Line 216: The fixture uses pramenDb.db when creating the OffsetManagerJdbc
instance (val om = new OffsetManagerJdbc(pramenDb.db, pramenDb.slickProfile,
123L); see OffsetManagerJdbc and om), which is inconsistent with other tests
that use pramenDb.slickDb; change the accessor from pramenDb.db to
pramenDb.slickDb throughout this fixture so the Database reference is accessed
the same way as in OffsetManagerJdbcSuite and JournalJdbcSuite.
Unit Test Coverage
Files
|
…alect for bookkeeping tables.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala (1)
219-219: Carrying thread stack traces into the failure exception is a solid improvement; minor semantic note on class choice.Using
OsSignalExceptionhere is functionally correct and better than the previousIllegalStateException(the stack traces are now preserved). However,OsSignalExceptionis semantically modelled for OS signals (the class name, and its message template"The process was interrupted by $signalName."is designed for "SIGINT", "SIGTERM", etc.). Passing"System.exit()"as thesignalNamewill surface the message"The process was interrupted by System.exit()."in pipeline notifications and emails, which could confuse operators —System.exit()is a programmatic JVM call, not an OS signal.Consider either:
- Using a more neutral exception type or a dedicated
UnexpectedExitExceptionfor this path, or- Adjusting
OsSignalException's message template to accommodate both cases (e.g.,"The process was terminated by $signalName.").🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala` at line 219, The current call setFailureException(OsSignalException("System.exit()", nonDaemonStackTraces)) uses OsSignalException which implies an OS signal; update the failure exception to a semantically neutral or dedicated type instead: either replace OsSignalException with a new UnexpectedExitException (or similar) that accepts the programmatic exit name and stack traces, or modify OsSignalException's message template to use a neutral wording like "terminated by" so it can accept "System.exit()" without misleading operators; change the instantiation at the setFailureException call and ensure the chosen exception class (UnexpectedExitException or revised OsSignalException) accepts the same parameters (signal/exit name and nonDaemonStackTraces).pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala (1)
38-44: Dead code inbeforeblock.The
UsingUtils.using(RdbJdbc(jdbcConfig))block (lines 40-42) has an empty body (theDROP SCHEMADDL is commented out). This creates anRdbJdbcconnection and immediately closes it, which serves no purpose for SQLite. Consider removing the no-op block to avoid confusion.♻️ Proposed cleanup
before { if (pramenDb != null) pramenDb.close() - UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => - //rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - } pramenDb = PramenDb(jdbcConfig) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala` around lines 38 - 44, The UsingUtils.using(RdbJdbc(jdbcConfig)) call in the before block is a no-op (its body is commented out) and should be removed to avoid opening and closing an unnecessary RdbJdbc connection; update the before block in TokenLockSQLiteSuite to delete the UsingUtils.using(RdbJdbc(jdbcConfig)) { ... } block (or restore the intended DDL if the drop schema was meant to run), keeping the existing pramenDb close/create logic (pramenDb.close() and pramenDb = PramenDb(jdbcConfig)).pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala (1)
29-37: Clean refactoring to profile-parameterized Slick usage.The constructor change and the per-profile
OffsetTable/SlickUtilswiring are consistent with the broader refactoring pattern across the codebase. The approach of instantiating table wrappers as anonymous trait implementations is used identically inPramenDb.scala.One minor observation:
PramenDbalready exposes a publicoffsetTable: OffsetTable(seePramenDb.scalaline 49). If callers constructOffsetManagerJdbcwith the sameslickProfilethatPramenDbalready holds, a secondOffsetTableinstance is created. This is functionally harmless (lightweight wrapper), but you could consider accepting the pre-builtOffsetTablefromPramenDbto avoid duplication.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala` around lines 29 - 37, OffsetManagerJdbc currently constructs its own OffsetTable and SlickUtils (via new OffsetTable { override val profile = slickProfile } and new SlickUtils(slickProfile) causing duplicate OffsetTable instances when PramenDb already provides one; change OffsetManagerJdbc to accept an OffsetTable (and optionally SlickUtils) as constructor parameters (e.g., add a param offsetTable: OffsetTable) and use that provided instance instead of creating a new anonymous OffsetTable inside the class, falling back to constructing a profile-bound OffsetTable only if none is supplied; reference symbols: OffsetManagerJdbc, OffsetTable, SlickUtils, and PramenDb.offsetTable.pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)
181-193: Driver-to-profile mapping covers a good range of supported databases.The mapping correctly handles both old and new MySQL drivers, as well as both Microsoft and jTDS SQL Server drivers. The
IllegalArgumentExceptionfor unrecognized drivers provides a clear error message.Consider whether Oracle Database support (
oracle.jdbc.OracleDriver→slick.jdbc.OracleProfile) should be included, given its prevalence in enterprise environments. This could also be deferred to a follow-up if Oracle isn't currently needed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala` around lines 181 - 193, The getProfile method currently maps several JDBC drivers to Slick profiles but omits Oracle; add pattern matching for Oracle drivers (e.g., "oracle.jdbc.OracleDriver" and "oracle.jdbc.driver.OracleDriver") to return slick.jdbc.OracleProfile in the driver match within getProfile, and update any relevant tests or documentation to include Oracle support and error messaging remains unchanged for unknown drivers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 96-111: The test currently uses eventually which may pass
immediately; to verify TokenLockJdbc's background refresh (tokenExpiresSeconds
overridden to 2L on lock1) ensure the test waits past the TTL before checking
that lock2 cannot acquire the lock: after acquiring lock1 (and before the final
assertions) insert a delay longer than 2 seconds (e.g., Thread.sleep > 2000ms)
to let the original ticket expire had it not been refreshed, then
assert(!lock2.tryAcquire()) and assert(!lock1.tryAcquire()) to confirm the
background watcher in TokenLockJdbc keeps the ticket alive.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala`:
- Around line 29-37: OffsetManagerJdbc currently constructs its own OffsetTable
and SlickUtils (via new OffsetTable { override val profile = slickProfile } and
new SlickUtils(slickProfile) causing duplicate OffsetTable instances when
PramenDb already provides one; change OffsetManagerJdbc to accept an OffsetTable
(and optionally SlickUtils) as constructor parameters (e.g., add a param
offsetTable: OffsetTable) and use that provided instance instead of creating a
new anonymous OffsetTable inside the class, falling back to constructing a
profile-bound OffsetTable only if none is supplied; reference symbols:
OffsetManagerJdbc, OffsetTable, SlickUtils, and PramenDb.offsetTable.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 181-193: The getProfile method currently maps several JDBC drivers
to Slick profiles but omits Oracle; add pattern matching for Oracle drivers
(e.g., "oracle.jdbc.OracleDriver" and "oracle.jdbc.driver.OracleDriver") to
return slick.jdbc.OracleProfile in the driver match within getProfile, and
update any relevant tests or documentation to include Oracle support and error
messaging remains unchanged for unknown drivers.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`:
- Line 219: The current call
setFailureException(OsSignalException("System.exit()", nonDaemonStackTraces))
uses OsSignalException which implies an OS signal; update the failure exception
to a semantically neutral or dedicated type instead: either replace
OsSignalException with a new UnexpectedExitException (or similar) that accepts
the programmatic exit name and stack traces, or modify OsSignalException's
message template to use a neutral wording like "terminated by" so it can accept
"System.exit()" without misleading operators; change the instantiation at the
setFailureException call and ensure the chosen exception class
(UnexpectedExitException or revised OsSignalException) accepts the same
parameters (signal/exit name and nonDaemonStackTraces).
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 38-44: The UsingUtils.using(RdbJdbc(jdbcConfig)) call in the
before block is a no-op (its body is commented out) and should be removed to
avoid opening and closing an unnecessary RdbJdbc connection; update the before
block in TokenLockSQLiteSuite to delete the
UsingUtils.using(RdbJdbc(jdbcConfig)) { ... } block (or restore the intended DDL
if the drop schema was meant to run), keeping the existing pramenDb close/create
logic (pramenDb.close() and pramenDb = PramenDb(jdbcConfig)).
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala (1)
764-771:⚠️ Potential issue | 🟡 Minor
diff.tableNameis unescaped in the same row block.While
columnName/dataType/oldType/newTypeare now properly escaped,diff.tableNameat line 765 is passed toTextElementwithoutescapeHTML. A table name containing<,>, or&(e.g., from a schema with non-standard naming) would produce malformed HTML in the notification email. Given the fix is being applied to the neighbouring cells, escaping this field here maintains consistency.🛡️ Proposed fix
- TextElement(diff.tableName), + TextElement(StringUtils.escapeHTML(diff.tableName)),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala` around lines 764 - 771, In PipelineNotificationBuilderHtml, the tableBuilder.withRow call builds a row using TextElement(diff.tableName) without escaping HTML; update that usage to wrap diff.tableName with the same escapeHTML helper used for column/data-type cells (i.e., replace TextElement(diff.tableName) with TextElement(escapeHTML(diff.tableName))) so table names containing <, > or & are properly escaped and consistent with the other cells.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala (1)
317-329:⚠️ Potential issue | 🟡 Minor
saveSchemaRawis missingensureDbConnectedunlikesaveSchema.
saveSchema(Line 303) callsslickUtils.ensureDbConnected(pramenDb.slickDb)before executing queries, butsaveSchemaRawdoes not. This inconsistency could cause issues if the DB connection has been lost beforesaveSchemaRawis called.Proposed fix
private[pramen] def saveSchemaRaw(table: String, infoDate: String, schema: String): Unit = { try { + slickUtils.ensureDbConnected(pramenDb.slickDb) pramenDb.slickDb.run(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala` around lines 317 - 329, The saveSchemaRaw method omits the DB connection check present in saveSchema; call slickUtils.ensureDbConnected(pramenDb.slickDb) at the start of saveSchemaRaw before running any pramenDb.slickDb queries to ensure the connection is valid, then proceed with the existing delete and insert logic and existing NonFatal exception handling (i.e., keep the try/catch and log.error as-is).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`:
- Around line 42-47: In AppContextImpl.close(), if bookkeeper.close() throws,
closable.close() never runs causing resource leaks; change the method to ensure
both resources are closed regardless of exceptions by wrapping
bookkeeper.close() in a try block and invoking closable.close() in a finally (or
use try-with-resources style/utility) and nulling closable afterwards, and make
sure to capture and rethrow or aggregate exceptions so errors from both
bookkeeper.close() and closable.close() are not silently lost; update the
close() implementation referencing AppContextImpl.close, bookkeeper, and
closable accordingly.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 57-92: Tests acquire TokenLock instances via getLock (lock1,
lock2) but may not release them if an assertion fails, leaving TokenLockJdbc's
background refresh thread running; wrap each test's lock usage in a try { ... }
finally { lock.release() } (or multiple locks: release each acquired lock in
finally) so that every acquired TokenLock (references: getLock, TokenLockJdbc,
lock1.release, lock2.release) is guaranteed to be released even on assertion
failure; apply the same try-finally pattern to both tests in this file.
- Around line 37-40: The File constructor arguments are reversed causing dbFile
to point to the wrong path and leaving stale SQLite data between tests; in
TokenLockSQLiteSuite locate where dbFile is created (variable dbFile) and change
the constructor to new File(tempDir, "pramen.sqlite") so the file refers to
$tempDir/pramen.sqlite, then keep the existing exists() and delete() checks to
remove the DB before each test run.
---
Outside diff comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`:
- Around line 317-329: The saveSchemaRaw method omits the DB connection check
present in saveSchema; call slickUtils.ensureDbConnected(pramenDb.slickDb) at
the start of saveSchemaRaw before running any pramenDb.slickDb queries to ensure
the connection is valid, then proceed with the existing delete and insert logic
and existing NonFatal exception handling (i.e., keep the try/catch and log.error
as-is).
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala`:
- Around line 764-771: In PipelineNotificationBuilderHtml, the
tableBuilder.withRow call builds a row using TextElement(diff.tableName) without
escaping HTML; update that usage to wrap diff.tableName with the same escapeHTML
helper used for column/data-type cells (i.e., replace
TextElement(diff.tableName) with TextElement(escapeHTML(diff.tableName))) so
table names containing <, > or & are properly escaped and consistent with the
other cells.
---
Duplicate comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`:
- Around line 94-107: Test is already correct: TokenLockJdbc is instantiated
with overridden tokenExpiresSeconds = 2L and try-finally ensures
lock1.release(); no changes needed — keep the current test using TokenLockJdbc,
tokenExpiresSeconds, tryAcquire(), Thread.sleep(3000) and release() as written.
| override def close(): Unit = synchronized { | ||
| if (closable != null) { | ||
| bookkeeper.close() | ||
| closable.close() | ||
| closable = null | ||
| } |
There was a problem hiding this comment.
If bookkeeper.close() throws, closable.close() is skipped — potential resource leak.
If bookkeeper.close() at Line 44 throws an exception, closable.close() at Line 45 (which closes PramenDb / Mongo connections) will not execute. Consider using a try-finally or a utility that closes both resources regardless of exceptions.
Proposed fix
override def close(): Unit = synchronized {
if (closable != null) {
- bookkeeper.close()
- closable.close()
+ try {
+ bookkeeper.close()
+ } finally {
+ closable.close()
+ }
closable = null
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`
around lines 42 - 47, In AppContextImpl.close(), if bookkeeper.close() throws,
closable.close() never runs causing resource leaks; change the method to ensure
both resources are closed regardless of exceptions by wrapping
bookkeeper.close() in a try block and invoking closable.close() in a finally (or
use try-with-resources style/utility) and nulling closable afterwards, and make
sure to capture and rethrow or aggregate exceptions so errors from both
bookkeeper.close() and closable.close() are not silently lost; update the
close() implementation referencing AppContextImpl.close, bookkeeper, and
closable accordingly.
| val dbFile = new File("pramen.sqlite", tempDir) | ||
| if (dbFile.exists()) { | ||
| dbFile.delete() | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Read the specific lines from the file
sed -n '35,42p' pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scalaRepository: AbsaOSS/pramen
Length of output: 258
🌐 Web query:
Java File constructor File(String parent, String child) signature documentation
💡 Result:
java.io.File has this constructor:
public File(String parent, String child)Javadoc (summary): creates a new File from a parent pathname string and a child pathname string; if parent is null, it behaves like new File(child); throws NullPointerException if child is null. [1]
Sources: [1] (docs.oracle.com)
Citations:
🏁 Script executed:
# Read lines around 44-48 to see beforeAll block where tempDir is created
sed -n '44,55p' pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scalaRepository: AbsaOSS/pramen
Length of output: 412
File constructor arguments are reversed — stale SQLite data persists between tests.
new File(String parent, String child) takes the parent directory first. As written, parent="pramen.sqlite" and child=tempDir, creating an invalid path. The SQLite file should be at $tempDir/pramen.sqlite (per the JDBC config), but the check and delete are applied to the wrong path. The file is never deleted, leaving stale lock-ticket rows between tests.
🐛 Proposed fix
- val dbFile = new File("pramen.sqlite", tempDir)
+ val dbFile = new File(tempDir, "pramen.sqlite")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val dbFile = new File("pramen.sqlite", tempDir) | |
| if (dbFile.exists()) { | |
| dbFile.delete() | |
| } | |
| val dbFile = new File(tempDir, "pramen.sqlite") | |
| if (dbFile.exists()) { | |
| dbFile.delete() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`
around lines 37 - 40, The File constructor arguments are reversed causing dbFile
to point to the wrong path and leaving stale SQLite data between tests; in
TokenLockSQLiteSuite locate where dbFile is created (variable dbFile) and change
the constructor to new File(tempDir, "pramen.sqlite") so the file refers to
$tempDir/pramen.sqlite, then keep the existing exists() and delete() checks to
remove the DB before each test run.
| "Token lock" should { | ||
| "be able to acquire and release locks" in { | ||
| val lock1 = getLock("token1") | ||
|
|
||
| assert(lock1.tryAcquire()) | ||
| assert(!lock1.tryAcquire()) | ||
|
|
||
| val lock2 = getLock("token1") | ||
| assert(!lock2.tryAcquire()) | ||
|
|
||
| lock1.release() | ||
|
|
||
| assert(lock2.tryAcquire()) | ||
| assert(!lock2.tryAcquire()) | ||
|
|
||
| lock2.release() | ||
| } | ||
|
|
||
| "multiple token locks should not affect each other" in { | ||
| val lock1 = getLock("token1") | ||
| val lock2 = getLock("token2") | ||
|
|
||
| assert(lock1.tryAcquire()) | ||
| assert(lock2.tryAcquire()) | ||
|
|
||
| assert(!lock1.tryAcquire()) | ||
| assert(!lock2.tryAcquire()) | ||
|
|
||
| lock1.release() | ||
|
|
||
| assert(lock1.tryAcquire()) | ||
| assert(!lock2.tryAcquire()) | ||
|
|
||
| lock1.release() | ||
| lock2.release() | ||
| } |
There was a problem hiding this comment.
Missing try-finally for lock release may leave background threads running after assertion failures.
If any assertion in test 1 or test 2 fails before release() is called, TokenLockJdbc's background refresh thread stays alive. The subsequent before block closes pramenDb, and that stale thread will encounter errors accessing the now-closed DB. Combined with the reversed File args that prevent SQLite cleanup, this can cause cascading flakiness.
🛡️ Suggested fix for test 1 (apply same pattern to test 2)
"be able to acquire and release locks" in {
val lock1 = getLock("token1")
- assert(lock1.tryAcquire())
- assert(!lock1.tryAcquire())
-
- val lock2 = getLock("token1")
- assert(!lock2.tryAcquire())
-
- lock1.release()
-
- assert(lock2.tryAcquire())
- assert(!lock2.tryAcquire())
-
- lock2.release()
+ try {
+ assert(lock1.tryAcquire())
+ assert(!lock1.tryAcquire())
+
+ val lock2 = getLock("token1")
+ try {
+ assert(!lock2.tryAcquire())
+ lock1.release()
+ assert(lock2.tryAcquire())
+ assert(!lock2.tryAcquire())
+ } finally {
+ lock2.release()
+ }
+ } finally {
+ lock1.release()
+ }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala`
around lines 57 - 92, Tests acquire TokenLock instances via getLock (lock1,
lock2) but may not release them if an assertion fails, leaving TokenLockJdbc's
background refresh thread running; wrap each test's lock usage in a try { ... }
finally { lock.release() } (or multiple locks: release each acquired lock in
finally) so that every acquired TokenLock (references: getLock, TokenLockJdbc,
lock1.release, lock2.release) is guaranteed to be released even on assertion
failure; apply the same try-finally pattern to both tests in this file.
Closes #692
Summary by CodeRabbit