From ff65059bb9654a343e408d6fcd730070f221fc58 Mon Sep 17 00:00:00 2001 From: Erik Gaasedelen Date: Thu, 12 Feb 2026 14:40:33 -0800 Subject: [PATCH 1/4] Add transaction support and fix _ColsGetter.__call__ - Add Database.transaction() context manager using contextvars - All existing db.t.* calls automatically participate in active transaction - Fix _ColsGetter.__call__: .columns -> .cols --- fastasyncpg/_modidx.py | 1 + fastasyncpg/core.py | 22 +++++- nbs/00_core.ipynb | 171 ++++++++++++++++++++++++++++++++++++----- nbs/db_dc.py | 28 +------ 4 files changed, 173 insertions(+), 49 deletions(-) diff --git a/fastasyncpg/_modidx.py b/fastasyncpg/_modidx.py index d067bd3..126af50 100644 --- a/fastasyncpg/_modidx.py +++ b/fastasyncpg/_modidx.py @@ -22,6 +22,7 @@ 'fastasyncpg.core.Database.t': ('core.html#database.t', 'fastasyncpg/core.py'), 'fastasyncpg.core.Database.table': ('core.html#database.table', 'fastasyncpg/core.py'), 'fastasyncpg.core.Database.table2glb': ('core.html#database.table2glb', 'fastasyncpg/core.py'), + 'fastasyncpg.core.Database.transaction': ('core.html#database.transaction', 'fastasyncpg/core.py'), 'fastasyncpg.core.FRecord': ('core.html#frecord', 'fastasyncpg/core.py'), 'fastasyncpg.core.FRecord.__getattr__': ('core.html#frecord.__getattr__', 'fastasyncpg/core.py'), 'fastasyncpg.core.FRecord._repr_html_': ('core.html#frecord._repr_html_', 'fastasyncpg/core.py'), diff --git a/fastasyncpg/core.py b/fastasyncpg/core.py index dce5004..99e54b1 100644 --- a/fastasyncpg/core.py +++ b/fastasyncpg/core.py @@ -70,6 +70,10 @@ async def pk_cols(conn, table): WHERE i.indrelid = $1::regclass AND i.indisprimary""", table) return [r['attname'] for r in res] +# %% ../nbs/00_core.ipynb #a947c1ab +import contextvars +_txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below + # %% ../nbs/00_core.ipynb #350e6168 class Database: def __init__(self, conn, refresh=True): @@ -83,7 +87,7 @@ def t(self): if not hasattr(self, '_t'): self._t = _TablesGetter(self) return self._t - def __getattr__(self, k): return getattr(self.conn, k) + def __getattr__(self, k): return getattr(_txn_conn.get() or self.conn, k) def table(self, name): if name not in self._tables: self._tables[name] = Table(self, name) @@ -104,7 +108,6 @@ def __str__(self): u,h,d,p = pr.user, a[0], pr.database, a[1] return f"postgresql://{u}@{h}:{p}/{d}" - # %% ../nbs/00_core.ipynb #ae461a23 class Table: def __init__(self, db, name): @@ -153,7 +156,7 @@ class _ColsGetter: def __init__(self, tbl): self.tbl = tbl def __dir__(self): return list(self.tbl.cols) def __repr__(self): return ", ".join(dir(self)) - def __call__(self): return [_Col(self.tbl.name,o.name) for o in self.tbl.columns] + def __call__(self): return [_Col(self.tbl.name, c) for c in self.tbl.cols] def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.cols def __getattr__(self, k): if k[0]=='_': raise AttributeError @@ -586,3 +589,16 @@ async def create_pool(*args, **kwargs): res = Database(pool, refresh=False) await res.refresh() return res + +# %% ../nbs/00_core.ipynb #b147794a +from contextlib import asynccontextmanager + +@patch +@asynccontextmanager +async def transaction(self:Database): + "Context manager yielding a transactional Database on a single connection" + async with self.acquire() as conn: + async with conn.transaction(): + _txn_conn.set(conn) + try: yield self + finally: _txn_conn.set(None) diff --git a/nbs/00_core.ipynb b/nbs/00_core.ipynb index 989abe0..e9c8db0 100644 --- a/nbs/00_core.ipynb +++ b/nbs/00_core.ipynb @@ -142,7 +142,7 @@ "text": [ " version \r\n", "-----------------------------------------------------------------------------------------------------------------------------\r\n", - " PostgreSQL 18.1 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit\r\n", + " PostgreSQL 18.2 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit\r\n", "(1 row)\r\n", "\r\n" ] @@ -170,10 +170,9 @@ "name": "stdout", "output_type": "stream", "text": [ - "\u001b[1mName Status User File\u001b[0m\r\n", - "cloudflared \u001b[39mnone\u001b[0m \r\n", - "postgresql@18 \u001b[32mstarted\u001b[0m jhoward ~/Library/LaunchAgents/homebrew.mxcl.postgresql@18.plist\r\n", - "unbound \u001b[39mnone\u001b[0m \r\n" + "\u001b[1mName Status User File\u001b[0m\r\n", + "postgresql@18 \u001b[32mstarted\u001b[0m erikgaas ~/Library/LaunchAgents/homebrew.mxcl.postgresql@18.plist\r\n", + "unbound \u001b[39mnone\u001b[0m \r\n" ] } ], @@ -244,7 +243,7 @@ { "data": { "text/plain": [ - "'jhoward'" + "'erikgaas'" ] }, "execution_count": null, @@ -273,7 +272,7 @@ { "data": { "text/plain": [ - "'PostgreSQL 18.1 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit'" + "'PostgreSQL 18.2 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit'" ] }, "execution_count": null, @@ -706,11 +705,7 @@ " ,\n", " ,\n", " ,\n", - " ,\n", - " ,\n", - " ,\n", - " ,\n", - " ]" + " ]" ] }, "execution_count": null, @@ -830,7 +825,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "artist album employee customer invoice invoice_line track playlist playlist_track genre media_type cats cat dog toy\n" + "artist album employee customer invoice invoice_line track playlist playlist_track genre media_type\n" ] } ], @@ -962,6 +957,18 @@ "`Database` wraps an asyncpg connection (or pool) and provides table/view metadata caching. It delegates unknown attributes to the underlying connection via `__getattr__`, so you can call `db.fetch(...)` directly. The `t` property returns a `_TablesGetter` for convenient table access." ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "a947c1ab", + "metadata": {}, + "outputs": [], + "source": [ + "#| export\n", + "import contextvars\n", + "_txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below" + ] + }, { "cell_type": "code", "execution_count": null, @@ -982,7 +989,7 @@ " if not hasattr(self, '_t'): self._t = _TablesGetter(self)\n", " return self._t\n", "\n", - " def __getattr__(self, k): return getattr(self.conn, k)\n", + " def __getattr__(self, k): return getattr(_txn_conn.get() or self.conn, k)\n", "\n", " def table(self, name):\n", " if name not in self._tables: self._tables[name] = Table(self, name)\n", @@ -1001,7 +1008,7 @@ " else:\n", " pr,a = self.conn._params, self.conn._addr\n", " u,h,d,p = pr.user, a[0], pr.database, a[1]\n", - " return f\"postgresql://{u}@{h}:{p}/{d}\"\n" + " return f\"postgresql://{u}@{h}:{p}/{d}\"" ] }, { @@ -1107,7 +1114,7 @@ { "data": { "text/plain": [ - "'postgresql://jhoward@127.0.0.1:5432/chinook'" + "'postgresql://erikgaas@127.0.0.1:5432/chinook'" ] }, "execution_count": null, @@ -1265,7 +1272,7 @@ " def __init__(self, tbl): self.tbl = tbl\n", " def __dir__(self): return list(self.tbl.cols)\n", " def __repr__(self): return \", \".join(dir(self))\n", - " def __call__(self): return [_Col(self.tbl.name,o.name) for o in self.tbl.columns]\n", + " def __call__(self): return [_Col(self.tbl.name, c) for c in self.tbl.cols]\n", " def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.cols\n", " def __getattr__(self, k):\n", " if k[0]=='_': raise AttributeError\n", @@ -2857,7 +2864,7 @@ { "data": { "text/plain": [ - "artist, album, employee, customer, invoice, invoice_line, track, playlist, playlist_track, genre, media_type, cats, cat, dog, toy" + "artist, album, employee, customer, invoice, invoice_line, track, playlist, playlist_track, genre, media_type" ] }, "execution_count": null, @@ -4317,7 +4324,7 @@ { "data": { "text/plain": [ - "'postgresql://jhoward@127.0.0.1:5432/chinook'" + "'postgresql://erikgaas@127.0.0.1:5432/chinook'" ] }, "execution_count": null, @@ -4328,6 +4335,132 @@ "source": [ "str(db)" ] + }, + { + "cell_type": "markdown", + "id": "b61496f7", + "metadata": {}, + "source": [ + "## Transactions" + ] + }, + { + "cell_type": "markdown", + "id": "af2f1878", + "metadata": {}, + "source": [ + "When using a connection pool, each query can run on a different connection. This means you can't group multiple operations into an atomic unit — if the second query fails, the first has already been committed. PostgreSQL transactions solve this: all operations within a transaction either succeed together or are rolled back together.\n", + "\n", + "`Database.transaction()` is a context manager that acquires a single connection from the pool, starts a transaction on it, and temporarily routes all queries through that connection using a `contextvars.ContextVar`. This means existing code — `db.t.credit.update(...)`, `db.q(...)`, `db.execute(...)` — automatically participates in the transaction without any changes. When the context manager exits, the transaction is committed (or rolled back on exception) and the connection is returned to the pool." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b147794a", + "metadata": {}, + "outputs": [], + "source": [ + "#| export\n", + "from contextlib import asynccontextmanager\n", + "\n", + "@patch\n", + "@asynccontextmanager\n", + "async def transaction(self:Database):\n", + " \"Context manager yielding a transactional Database on a single connection\"\n", + " async with self.acquire() as conn:\n", + " async with conn.transaction():\n", + " _txn_conn.set(conn)\n", + " try: yield self\n", + " finally: _txn_conn.set(None)" + ] + }, + { + "cell_type": "markdown", + "id": "62d7f428", + "metadata": {}, + "source": [ + "Inside a transaction, all operations are atomic — either all succeed or all are rolled back. Here we verify that a failed operation inside a transaction leaves the database unchanged:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56fde315", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Before: 275, After: 275 — rollback worked!\n" + ] + } + ], + "source": [ + "count_before = await db.t.artist.count\n", + "next_id = (await db.q(\"SELECT MAX(artist_id) as m FROM artist\"))[0]['m'] + 1\n", + "\n", + "# This transaction will fail — the insert succeeds but the deliberate error rolls it back\n", + "try:\n", + " async with db.transaction():\n", + " await db.t.artist.insert(artist_id=next_id, name='Test Artist')\n", + " raise ValueError(\"something went wrong\")\n", + "except ValueError: pass\n", + "\n", + "count_after = await db.t.artist.count\n", + "assert count_before == count_after, \"Transaction should have rolled back\"\n", + "print(f\"Before: {count_before}, After: {count_after} — rollback worked!\")" + ] + }, + { + "cell_type": "markdown", + "id": "a290d39a", + "metadata": {}, + "source": [ + "And when the transaction succeeds, the changes are committed:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "32cb3b9d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Before: 275, After: 276 — commit worked!\n" + ] + }, + { + "data": { + "text/html": [ + "
FieldValue
artist_id276
nameCommitted Artist
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "count_before = await db.t.artist.count\n", + "\n", + "async with db.transaction():\n", + " await db.t.artist.insert(artist_id=next_id, name='Committed Artist')\n", + "\n", + "count_after = await db.t.artist.count\n", + "assert count_after == count_before + 1, \"Transaction should have committed\"\n", + "print(f\"Before: {count_before}, After: {count_after} — commit worked!\")\n", + "\n", + "# Clean up\n", + "await db.t.artist.delete(next_id)" + ] } ], "metadata": {}, diff --git a/nbs/db_dc.py b/nbs/db_dc.py index 688b7f6..8fefd11 100644 --- a/nbs/db_dc.py +++ b/nbs/db_dc.py @@ -1,4 +1,4 @@ -__all__ = ["Album", "Artist", "Cat", "Cats", "Customer", "Dog", "Employee", "Genre", "Invoice", "Invoice_Line", "Media_Type", "Playlist", "Playlist_Track", "Toy", "Track"] +__all__ = ["Album", "Artist", "Customer", "Employee", "Genre", "Invoice", "Invoice_Line", "Media_Type", "Playlist", "Playlist_Track", "Track"] from dataclasses import dataclass import datetime,decimal from uuid import UUID @@ -14,20 +14,6 @@ class Artist: artist_id: int | None = UNSET name: str | None = UNSET -@dataclass -class Cat: - id: int | None = UNSET - name: str | None = UNSET - weight: float | None = UNSET - uid: int | None = UNSET - -@dataclass -class Cats: - id: int | None = UNSET - name: str | None = UNSET - weight: float | None = UNSET - uid: int | None = UNSET - @dataclass class Customer: customer_id: int | None = UNSET @@ -44,12 +30,6 @@ class Customer: email: str | None = UNSET support_rep_id: int | None = UNSET -@dataclass -class Dog: - id: int | None = UNSET - name: str | None = UNSET - age: int | None = UNSET - @dataclass class Employee: employee_id: int | None = UNSET @@ -108,12 +88,6 @@ class Playlist_Track: playlist_id: int | None = UNSET track_id: int | None = UNSET -@dataclass -class Toy: - id: int | None = UNSET - name: str | None = UNSET - dog_id: int | None = UNSET - @dataclass class Track: track_id: int | None = UNSET From a760a5f28a136f70fec639e3c82e9555c322815b Mon Sep 17 00:00:00 2001 From: Erik Gaasedelen Date: Thu, 12 Feb 2026 14:47:05 -0800 Subject: [PATCH 2/4] move import --- fastasyncpg/core.py | 3 +-- nbs/00_core.ipynb | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/fastasyncpg/core.py b/fastasyncpg/core.py index 99e54b1..ce2e832 100644 --- a/fastasyncpg/core.py +++ b/fastasyncpg/core.py @@ -72,6 +72,7 @@ async def pk_cols(conn, table): # %% ../nbs/00_core.ipynb #a947c1ab import contextvars +from contextlib import asynccontextmanager _txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below # %% ../nbs/00_core.ipynb #350e6168 @@ -591,8 +592,6 @@ async def create_pool(*args, **kwargs): return res # %% ../nbs/00_core.ipynb #b147794a -from contextlib import asynccontextmanager - @patch @asynccontextmanager async def transaction(self:Database): diff --git a/nbs/00_core.ipynb b/nbs/00_core.ipynb index e9c8db0..9c641ed 100644 --- a/nbs/00_core.ipynb +++ b/nbs/00_core.ipynb @@ -966,6 +966,7 @@ "source": [ "#| export\n", "import contextvars\n", + "from contextlib import asynccontextmanager\n", "_txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below" ] }, @@ -4362,8 +4363,6 @@ "outputs": [], "source": [ "#| export\n", - "from contextlib import asynccontextmanager\n", - "\n", "@patch\n", "@asynccontextmanager\n", "async def transaction(self:Database):\n", From ff3a9ba0904425a46c235656bfc8f84bed01430a Mon Sep 17 00:00:00 2001 From: Erik Gaasedelen Date: Thu, 12 Feb 2026 14:48:54 -0800 Subject: [PATCH 3/4] fix --- fastasyncpg/core.py | 5 ++--- nbs/00_core.ipynb | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/fastasyncpg/core.py b/fastasyncpg/core.py index ce2e832..bbc7bdb 100644 --- a/fastasyncpg/core.py +++ b/fastasyncpg/core.py @@ -7,8 +7,9 @@ # %% ../nbs/00_core.ipynb #222d751e from fastcore.utils import * -import asyncpg +import asyncpg, contextvars from asyncpg import connection,protocol +from contextlib import asynccontextmanager # %% ../nbs/00_core.ipynb #cdb9cd9e class Results(list): @@ -71,8 +72,6 @@ async def pk_cols(conn, table): return [r['attname'] for r in res] # %% ../nbs/00_core.ipynb #a947c1ab -import contextvars -from contextlib import asynccontextmanager _txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below # %% ../nbs/00_core.ipynb #350e6168 diff --git a/nbs/00_core.ipynb b/nbs/00_core.ipynb index 9c641ed..0f69649 100644 --- a/nbs/00_core.ipynb +++ b/nbs/00_core.ipynb @@ -220,8 +220,9 @@ "source": [ "#| export\n", "from fastcore.utils import *\n", - "import asyncpg\n", - "from asyncpg import connection,protocol" + "import asyncpg, contextvars\n", + "from asyncpg import connection,protocol\n", + "from contextlib import asynccontextmanager" ] }, { @@ -965,8 +966,6 @@ "outputs": [], "source": [ "#| export\n", - "import contextvars\n", - "from contextlib import asynccontextmanager\n", "_txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below" ] }, From 90a6f0705029308b68b160964b8f5c175ef39be2 Mon Sep 17 00:00:00 2001 From: Erik Gaasedelen Date: Thu, 12 Feb 2026 14:50:18 -0800 Subject: [PATCH 4/4] revert --- nbs/db_dc.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/nbs/db_dc.py b/nbs/db_dc.py index 8fefd11..688b7f6 100644 --- a/nbs/db_dc.py +++ b/nbs/db_dc.py @@ -1,4 +1,4 @@ -__all__ = ["Album", "Artist", "Customer", "Employee", "Genre", "Invoice", "Invoice_Line", "Media_Type", "Playlist", "Playlist_Track", "Track"] +__all__ = ["Album", "Artist", "Cat", "Cats", "Customer", "Dog", "Employee", "Genre", "Invoice", "Invoice_Line", "Media_Type", "Playlist", "Playlist_Track", "Toy", "Track"] from dataclasses import dataclass import datetime,decimal from uuid import UUID @@ -14,6 +14,20 @@ class Artist: artist_id: int | None = UNSET name: str | None = UNSET +@dataclass +class Cat: + id: int | None = UNSET + name: str | None = UNSET + weight: float | None = UNSET + uid: int | None = UNSET + +@dataclass +class Cats: + id: int | None = UNSET + name: str | None = UNSET + weight: float | None = UNSET + uid: int | None = UNSET + @dataclass class Customer: customer_id: int | None = UNSET @@ -30,6 +44,12 @@ class Customer: email: str | None = UNSET support_rep_id: int | None = UNSET +@dataclass +class Dog: + id: int | None = UNSET + name: str | None = UNSET + age: int | None = UNSET + @dataclass class Employee: employee_id: int | None = UNSET @@ -88,6 +108,12 @@ class Playlist_Track: playlist_id: int | None = UNSET track_id: int | None = UNSET +@dataclass +class Toy: + id: int | None = UNSET + name: str | None = UNSET + dog_id: int | None = UNSET + @dataclass class Track: track_id: int | None = UNSET