Skip to content
1 change: 1 addition & 0 deletions singlestoredb/apps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from ._cloud_functions import run_function_app # noqa: F401
from ._dashboards import run_dashboard_app # noqa: F401
from ._python_udfs import run_udf_app # noqa: F401
6 changes: 6 additions & 0 deletions singlestoredb/apps/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ class AppConfig:
listen_port: int
base_url: str
base_path: str
notebook_server_id: str
app_token: Optional[str]
user_token: Optional[str]
running_interactively: bool
is_gateway_enabled: bool
is_local_dev: bool

@staticmethod
def _read_variable(name: str) -> str:
Expand All @@ -28,6 +30,8 @@ def from_env(cls) -> 'AppConfig':
port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT')
base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL')
base_path = cls._read_variable('SINGLESTOREDB_APP_BASE_PATH')
notebook_server_id = cls._read_variable('SINGLESTOREDB_NOTEBOOK_SERVER_ID')
is_local_dev_env_var = cls._read_variable('SINGLESTOREDB_IS_LOCAL_DEV')

workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE')
running_interactively = workload_type == 'InteractiveNotebook'
Expand All @@ -49,10 +53,12 @@ def from_env(cls) -> 'AppConfig':
listen_port=int(port),
base_url=base_url,
base_path=base_path,
notebook_server_id=notebook_server_id,
app_token=app_token,
user_token=user_token,
running_interactively=running_interactively,
is_gateway_enabled=is_gateway_enabled,
is_local_dev=is_local_dev_env_var == 'true',
)

@property
Expand Down
8 changes: 8 additions & 0 deletions singlestoredb/apps/_connection_info.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dataclasses import dataclass
from typing import Any
from typing import Dict
from typing import Optional


Expand All @@ -8,3 +10,9 @@ class ConnectionInfo:

# Only present in interactive mode
token: Optional[str]


@dataclass
class UdfConnectionInfo:
url: str
functions: Dict[str, Any]
85 changes: 85 additions & 0 deletions singlestoredb/apps/_python_udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import asyncio
import os
import typing

from ..functions.ext.asgi import Application
from ._config import AppConfig
from ._connection_info import UdfConnectionInfo
from ._process import kill_process_by_port

if typing.TYPE_CHECKING:
from ._uvicorn_util import AwaitableUvicornServer

# Keep track of currently running server
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None


async def run_udf_app(
replace_existing: bool,
log_level: str = 'error',
kill_existing_app_server: bool = True,
) -> UdfConnectionInfo:
global _running_server
from ._uvicorn_util import AwaitableUvicornServer

try:
import uvicorn
except ImportError:
raise ImportError('package uvicorn is required to run python udfs')

app_config = AppConfig.from_env()

if kill_existing_app_server:
# Shutdown the server gracefully if it was started by us.
# Since the uvicorn server doesn't start a new subprocess
# killing the process would result in kernel dying.
if _running_server is not None:
await _running_server.shutdown()
_running_server = None

# Kill if any other process is occupying the port
kill_process_by_port(app_config.listen_port)

base_url = generate_base_url(app_config)

udf_suffix = ''
if app_config.running_interactively:
udf_suffix = '_test'
app = Application(url=base_url, app_mode='managed', name_suffix=udf_suffix)

config = uvicorn.Config(
app,
host='0.0.0.0',
port=app_config.listen_port,
log_level=log_level,
)
_running_server = AwaitableUvicornServer(config)

# Register the functions
app.register_functions(replace=replace_existing)

asyncio.create_task(_running_server.serve())
await _running_server.wait_for_startup()

print(f'Python UDF registered at {base_url}')

return UdfConnectionInfo(base_url, app.get_function_info())


def generate_base_url(app_config: AppConfig) -> str:
if not app_config.is_gateway_enabled:
raise RuntimeError('Python UDFs are not available if Nova Gateway is not enabled')

if not app_config.running_interactively:
return app_config.base_url

# generate python udf endpoint for interactive notebooks
gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT')
if app_config.is_local_dev:
gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT')
if gateway_url is None:
raise RuntimeError(
'Missing SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT environment variable.',
)

return f'{gateway_url}/pythonudfs/{app_config.notebook_server_id}/interactive/'
2 changes: 1 addition & 1 deletion singlestoredb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@
'external_function.app_mode', 'string',
functools.partial(
check_str,
valid_values=['remote', 'collocated'],
valid_values=['remote', 'collocated', 'managed'],
),
'remote',
'Specifies the mode of operation of the external function application.',
Expand Down
Loading