Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/fusion-docs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
on:
push:
tags:
- 'v*.*.*'
release:
types: [published]

name: Generate Fusion docs

Expand Down Expand Up @@ -36,6 +35,6 @@ jobs:

- name: Upload release asset
run: |
gh release upload ${{ github.ref_name }} fusion-docs.zip
gh release upload ${{ github.event.release.tag_name }} fusion-docs.zip
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
34 changes: 30 additions & 4 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
name: Publish packages

on:
push:
tags:
- 'v*-rc*'
- 'v*-test*'
- 'v*-alpha*'
- 'v*-beta*'
release:
types: [published]
workflow_dispatch:
Expand Down Expand Up @@ -157,15 +163,17 @@ jobs:
runs-on: ubuntu-latest

permissions:
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: read # Required for repository access
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: write # Required for gh release create/upload

environment:
name: publish
url: https://pypi.org/p/singlestoredb

steps:
- uses: actions/checkout@v3

- name: Download Linux wheels and sdist
uses: actions/download-artifact@v4
with:
Expand All @@ -184,8 +192,26 @@ jobs:
name: artifacts-macOS
path: dist

- name: Create GitHub Release (test tag)
if: github.event_name == 'push'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release create "${{ github.ref_name }}" \
--prerelease \
--title "${{ github.ref_name }}" \
--notes "" \
dist/*

- name: Upload assets to existing Release
if: github.event_name == 'release'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release upload "${{ github.event.release.tag_name }}" dist/* --clobber

- name: Publish to PyPI
if: ${{ github.event_name == 'release' || github.event.inputs.publish_pypi == 'true' }}
if: ${{ github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.publish_pypi == 'true') }}
uses: pypa/gh-action-pypi-publish@release/v1

# - name: Publish Conda package
Expand Down
64 changes: 59 additions & 5 deletions singlestoredb/functions/ext/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,56 @@ async def to_thread(
return await loop.run_in_executor(None, func_call)


async def _poll_cancel(cancel_event: threading.Event) -> None:
while not cancel_event.is_set():
await asyncio.sleep(0.1)


async def _cancellable_run(
cancel_event: threading.Event,
coro: Any,
) -> Any:
task = asyncio.create_task(coro)
cancel_check = asyncio.create_task(_poll_cancel(cancel_event))
done, pending = await asyncio.wait(
[task, cancel_check], return_when=asyncio.FIRST_COMPLETED,
)
for p in pending:
p.cancel()
if cancel_check in done:
task.cancel()
raise asyncio.CancelledError()
return task.result()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successful UDF result discarded when cancel races completion

Medium Severity

_cancellable_run unconditionally prioritizes cancellation over a successful result when both tasks complete in the same event loop iteration. asyncio.wait(return_when=FIRST_COMPLETED) can return both task and cancel_check in done if they finish simultaneously. Because if cancel_check in done is checked without first checking whether task also succeeded, a completed UDF result is silently discarded and replaced with CancelledError. Checking task in done first (and returning its result) would preserve the successfully-computed value.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 2f819f5. Configure here.



def _run_with_graceful_shutdown(coro: Any) -> Any:
"""Run a coroutine in a new event loop, draining callbacks before close.

Unlike asyncio.run(), this prevents 'Event loop is closed' errors from
libraries (httpx/anyio) that schedule cleanup callbacks during teardown.
"""
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
try:
pending = asyncio.all_tasks(loop)
if pending:
for task in pending:
task.cancel()
loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True),
)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
loop.call_soon(loop.stop)
loop.run_forever()
asyncio.set_event_loop(None)
loop.close()


# Use negative values to indicate unsigned ints / binary data / usec time precision
rowdat_1_type_map = {
'bool': ft.LONGLONG,
Expand Down Expand Up @@ -1190,11 +1240,12 @@ async def __call__(
)

func_task = asyncio.create_task(
func(cancel_event, call_timer, *inputs)
if func_info['is_async']
else to_thread(
Comment thread
cursor[bot] marked this conversation as resolved.
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
to_thread(
lambda: _run_with_graceful_shutdown(
_cancellable_run(
cancel_event,
func(cancel_event, call_timer, *inputs),
),
),
),
)
Expand All @@ -1214,6 +1265,8 @@ async def __call__(
)

await cancel_all_tasks(pending)
if func_task in pending:
cancel_event.set()

for task in done:
if task is disconnect_task:
Expand Down Expand Up @@ -1286,6 +1339,7 @@ async def __call__(
await send(self.error_response_dict)

finally:
cancel_event.set()
await cancel_all_tasks(all_tasks)

# Handle api reflection
Expand Down
34 changes: 18 additions & 16 deletions singlestoredb/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
try:
import pandas as pd
has_pandas = True
_pd_str_dtype = str(pd.DataFrame({'a': ['x']}).dtypes['a'])
except ImportError:
has_pandas = False
_pd_str_dtype = 'object'


class TestConnection(unittest.TestCase):
Expand Down Expand Up @@ -1124,21 +1126,21 @@ def test_alltypes_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'float64'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down Expand Up @@ -1266,21 +1268,21 @@ def test_alltypes_no_nulls_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'int16'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down
Loading
Loading