Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastasyncpg/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'fastasyncpg.core.Database.t': ('core.html#database.t', 'fastasyncpg/core.py'),
'fastasyncpg.core.Database.table': ('core.html#database.table', 'fastasyncpg/core.py'),
'fastasyncpg.core.Database.table2glb': ('core.html#database.table2glb', 'fastasyncpg/core.py'),
'fastasyncpg.core.Database.transaction': ('core.html#database.transaction', 'fastasyncpg/core.py'),
'fastasyncpg.core.FRecord': ('core.html#frecord', 'fastasyncpg/core.py'),
'fastasyncpg.core.FRecord.__getattr__': ('core.html#frecord.__getattr__', 'fastasyncpg/core.py'),
'fastasyncpg.core.FRecord._repr_html_': ('core.html#frecord._repr_html_', 'fastasyncpg/core.py'),
Expand Down
22 changes: 18 additions & 4 deletions fastasyncpg/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

# %% ../nbs/00_core.ipynb #222d751e
from fastcore.utils import *
import asyncpg
import asyncpg, contextvars
from asyncpg import connection,protocol
from contextlib import asynccontextmanager

# %% ../nbs/00_core.ipynb #cdb9cd9e
class Results(list):
Expand Down Expand Up @@ -70,6 +71,9 @@ async def pk_cols(conn, table):
WHERE i.indrelid = $1::regclass AND i.indisprimary""", table)
return [r['attname'] for r in res]

# %% ../nbs/00_core.ipynb #a947c1ab
_txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below

# %% ../nbs/00_core.ipynb #350e6168
class Database:
def __init__(self, conn, refresh=True):
Expand All @@ -83,7 +87,7 @@ def t(self):
if not hasattr(self, '_t'): self._t = _TablesGetter(self)
return self._t

def __getattr__(self, k): return getattr(self.conn, k)
def __getattr__(self, k): return getattr(_txn_conn.get() or self.conn, k)

def table(self, name):
if name not in self._tables: self._tables[name] = Table(self, name)
Expand All @@ -104,7 +108,6 @@ def __str__(self):
u,h,d,p = pr.user, a[0], pr.database, a[1]
return f"postgresql://{u}@{h}:{p}/{d}"


# %% ../nbs/00_core.ipynb #ae461a23
class Table:
def __init__(self, db, name):
Expand Down Expand Up @@ -153,7 +156,7 @@ class _ColsGetter:
def __init__(self, tbl): self.tbl = tbl
def __dir__(self): return list(self.tbl.cols)
def __repr__(self): return ", ".join(dir(self))
def __call__(self): return [_Col(self.tbl.name,o.name) for o in self.tbl.columns]
def __call__(self): return [_Col(self.tbl.name, c) for c in self.tbl.cols]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugfix. .columns returns a dict. cols returns a list of the names.

def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.cols
def __getattr__(self, k):
if k[0]=='_': raise AttributeError
Expand Down Expand Up @@ -586,3 +589,14 @@ async def create_pool(*args, **kwargs):
res = Database(pool, refresh=False)
await res.refresh()
return res

# %% ../nbs/00_core.ipynb #b147794a
@patch
@asynccontextmanager
async def transaction(self:Database):
"Context manager yielding a transactional Database on a single connection"
async with self.acquire() as conn:
async with conn.transaction():
_txn_conn.set(conn)
try: yield self
finally: _txn_conn.set(None)
173 changes: 152 additions & 21 deletions nbs/00_core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
"text": [
" version \r\n",
"-----------------------------------------------------------------------------------------------------------------------------\r\n",
" PostgreSQL 18.1 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit\r\n",
" PostgreSQL 18.2 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit\r\n",
"(1 row)\r\n",
"\r\n"
]
Expand Down Expand Up @@ -170,10 +170,9 @@
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[1mName Status User File\u001b[0m\r\n",
"cloudflared \u001b[39mnone\u001b[0m \r\n",
"postgresql@18 \u001b[32mstarted\u001b[0m jhoward ~/Library/LaunchAgents/homebrew.mxcl.postgresql@18.plist\r\n",
"unbound \u001b[39mnone\u001b[0m \r\n"
"\u001b[1mName Status User File\u001b[0m\r\n",
"postgresql@18 \u001b[32mstarted\u001b[0m erikgaas ~/Library/LaunchAgents/homebrew.mxcl.postgresql@18.plist\r\n",
"unbound \u001b[39mnone\u001b[0m \r\n"
]
}
],
Expand Down Expand Up @@ -221,8 +220,9 @@
"source": [
"#| export\n",
"from fastcore.utils import *\n",
"import asyncpg\n",
"from asyncpg import connection,protocol"
"import asyncpg, contextvars\n",
"from asyncpg import connection,protocol\n",
"from contextlib import asynccontextmanager"
]
},
{
Expand All @@ -244,7 +244,7 @@
{
"data": {
"text/plain": [
"'jhoward'"
"'erikgaas'"
]
},
"execution_count": null,
Expand Down Expand Up @@ -273,7 +273,7 @@
{
"data": {
"text/plain": [
"'PostgreSQL 18.1 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit'"
"'PostgreSQL 18.2 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit'"
]
},
"execution_count": null,
Expand Down Expand Up @@ -706,11 +706,7 @@
" <Record table_name='playlist'>,\n",
" <Record table_name='playlist_track'>,\n",
" <Record table_name='genre'>,\n",
" <Record table_name='media_type'>,\n",
" <Record table_name='cats'>,\n",
" <Record table_name='cat'>,\n",
" <Record table_name='dog'>,\n",
" <Record table_name='toy'>]"
" <Record table_name='media_type'>]"
]
},
"execution_count": null,
Expand Down Expand Up @@ -830,7 +826,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"artist album employee customer invoice invoice_line track playlist playlist_track genre media_type cats cat dog toy\n"
"artist album employee customer invoice invoice_line track playlist playlist_track genre media_type\n"
]
}
],
Expand Down Expand Up @@ -962,6 +958,17 @@
"`Database` wraps an asyncpg connection (or pool) and provides table/view metadata caching. It delegates unknown attributes to the underlying connection via `__getattr__`, so you can call `db.fetch(...)` directly. The `t` property returns a `_TablesGetter` for convenient table access."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a947c1ab",
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"_txn_conn = contextvars.ContextVar('_txn_conn', default=None) # Used by Database.__getattr__ to route queries through active transaction; see Transactions section below"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -982,7 +989,7 @@
" if not hasattr(self, '_t'): self._t = _TablesGetter(self)\n",
" return self._t\n",
"\n",
" def __getattr__(self, k): return getattr(self.conn, k)\n",
" def __getattr__(self, k): return getattr(_txn_conn.get() or self.conn, k)\n",
"\n",
" def table(self, name):\n",
" if name not in self._tables: self._tables[name] = Table(self, name)\n",
Expand All @@ -1001,7 +1008,7 @@
" else:\n",
" pr,a = self.conn._params, self.conn._addr\n",
" u,h,d,p = pr.user, a[0], pr.database, a[1]\n",
" return f\"postgresql://{u}@{h}:{p}/{d}\"\n"
" return f\"postgresql://{u}@{h}:{p}/{d}\""
]
},
{
Expand Down Expand Up @@ -1107,7 +1114,7 @@
{
"data": {
"text/plain": [
"'postgresql://jhoward@127.0.0.1:5432/chinook'"
"'postgresql://erikgaas@127.0.0.1:5432/chinook'"
]
},
"execution_count": null,
Expand Down Expand Up @@ -1265,7 +1272,7 @@
" def __init__(self, tbl): self.tbl = tbl\n",
" def __dir__(self): return list(self.tbl.cols)\n",
" def __repr__(self): return \", \".join(dir(self))\n",
" def __call__(self): return [_Col(self.tbl.name,o.name) for o in self.tbl.columns]\n",
" def __call__(self): return [_Col(self.tbl.name, c) for c in self.tbl.cols]\n",
" def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.cols\n",
" def __getattr__(self, k):\n",
" if k[0]=='_': raise AttributeError\n",
Expand Down Expand Up @@ -2857,7 +2864,7 @@
{
"data": {
"text/plain": [
"artist, album, employee, customer, invoice, invoice_line, track, playlist, playlist_track, genre, media_type, cats, cat, dog, toy"
"artist, album, employee, customer, invoice, invoice_line, track, playlist, playlist_track, genre, media_type"
]
},
"execution_count": null,
Expand Down Expand Up @@ -4317,7 +4324,7 @@
{
"data": {
"text/plain": [
"'postgresql://jhoward@127.0.0.1:5432/chinook'"
"'postgresql://erikgaas@127.0.0.1:5432/chinook'"
]
},
"execution_count": null,
Expand All @@ -4328,6 +4335,130 @@
"source": [
"str(db)"
]
},
{
"cell_type": "markdown",
"id": "b61496f7",
"metadata": {},
"source": [
"## Transactions"
]
},
{
"cell_type": "markdown",
"id": "af2f1878",
"metadata": {},
"source": [
"When using a connection pool, each query can run on a different connection. This means you can't group multiple operations into an atomic unit — if the second query fails, the first has already been committed. PostgreSQL transactions solve this: all operations within a transaction either succeed together or are rolled back together.\n",
"\n",
"`Database.transaction()` is a context manager that acquires a single connection from the pool, starts a transaction on it, and temporarily routes all queries through that connection using a `contextvars.ContextVar`. This means existing code — `db.t.credit.update(...)`, `db.q(...)`, `db.execute(...)` — automatically participates in the transaction without any changes. When the context manager exits, the transaction is committed (or rolled back on exception) and the connection is returned to the pool."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b147794a",
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"@patch\n",
"@asynccontextmanager\n",
"async def transaction(self:Database):\n",
" \"Context manager yielding a transactional Database on a single connection\"\n",
" async with self.acquire() as conn:\n",
" async with conn.transaction():\n",
" _txn_conn.set(conn)\n",
" try: yield self\n",
" finally: _txn_conn.set(None)"
]
},
{
"cell_type": "markdown",
"id": "62d7f428",
"metadata": {},
"source": [
"Inside a transaction, all operations are atomic — either all succeed or all are rolled back. Here we verify that a failed operation inside a transaction leaves the database unchanged:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "56fde315",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Before: 275, After: 275 — rollback worked!\n"
]
}
],
"source": [
"count_before = await db.t.artist.count\n",
"next_id = (await db.q(\"SELECT MAX(artist_id) as m FROM artist\"))[0]['m'] + 1\n",
"\n",
"# This transaction will fail — the insert succeeds but the deliberate error rolls it back\n",
"try:\n",
" async with db.transaction():\n",
" await db.t.artist.insert(artist_id=next_id, name='Test Artist')\n",
" raise ValueError(\"something went wrong\")\n",
"except ValueError: pass\n",
"\n",
"count_after = await db.t.artist.count\n",
"assert count_before == count_after, \"Transaction should have rolled back\"\n",
"print(f\"Before: {count_before}, After: {count_after} — rollback worked!\")"
]
},
{
"cell_type": "markdown",
"id": "a290d39a",
"metadata": {},
"source": [
"And when the transaction succeeds, the changes are committed:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32cb3b9d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Before: 275, After: 276 — commit worked!\n"
]
},
{
"data": {
"text/html": [
"<table class=\"prose\"><thead><tr><th>Field</th><th>Value</th></tr></thead><tbody><tr><td>artist_id</td><td>276</td></tr><tr><td>name</td><td>Committed Artist</td></tr></tbody></table>"
],
"text/plain": [
"<FRecord artist_id=276 name='Committed Artist'>"
]
},
"execution_count": null,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"count_before = await db.t.artist.count\n",
"\n",
"async with db.transaction():\n",
" await db.t.artist.insert(artist_id=next_id, name='Committed Artist')\n",
"\n",
"count_after = await db.t.artist.count\n",
"assert count_after == count_before + 1, \"Transaction should have committed\"\n",
"print(f\"Before: {count_before}, After: {count_after} — commit worked!\")\n",
"\n",
"# Clean up\n",
"await db.t.artist.delete(next_id)"
]
}
],
"metadata": {},
Expand Down