diff --git a/singlestoredb/apps/__init__.py b/singlestoredb/apps/__init__.py index b9f54898f..a07f228cb 100644 --- a/singlestoredb/apps/__init__.py +++ b/singlestoredb/apps/__init__.py @@ -1,2 +1,3 @@ from ._cloud_functions import run_function_app # noqa: F401 from ._dashboards import run_dashboard_app # noqa: F401 +from ._python_udfs import run_udf_app # noqa: F401 diff --git a/singlestoredb/apps/_config.py b/singlestoredb/apps/_config.py index 0bfb19c69..7f058c1e9 100644 --- a/singlestoredb/apps/_config.py +++ b/singlestoredb/apps/_config.py @@ -8,10 +8,12 @@ class AppConfig: listen_port: int base_url: str base_path: str + notebook_server_id: str app_token: Optional[str] user_token: Optional[str] running_interactively: bool is_gateway_enabled: bool + is_local_dev: bool @staticmethod def _read_variable(name: str) -> str: @@ -28,6 +30,8 @@ def from_env(cls) -> 'AppConfig': port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT') base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL') base_path = cls._read_variable('SINGLESTOREDB_APP_BASE_PATH') + notebook_server_id = cls._read_variable('SINGLESTOREDB_NOTEBOOK_SERVER_ID') + is_local_dev_env_var = cls._read_variable('SINGLESTOREDB_IS_LOCAL_DEV') workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE') running_interactively = workload_type == 'InteractiveNotebook' @@ -49,10 +53,12 @@ def from_env(cls) -> 'AppConfig': listen_port=int(port), base_url=base_url, base_path=base_path, + notebook_server_id=notebook_server_id, app_token=app_token, user_token=user_token, running_interactively=running_interactively, is_gateway_enabled=is_gateway_enabled, + is_local_dev=is_local_dev_env_var == 'true', ) @property diff --git a/singlestoredb/apps/_connection_info.py b/singlestoredb/apps/_connection_info.py index 32fe6ea2d..990ca053c 100644 --- a/singlestoredb/apps/_connection_info.py +++ b/singlestoredb/apps/_connection_info.py @@ -1,4 +1,6 @@ from dataclasses import dataclass +from typing import Any +from typing import Dict from typing import Optional @@ -8,3 +10,9 @@ class ConnectionInfo: # Only present in interactive mode token: Optional[str] + + +@dataclass +class UdfConnectionInfo: + url: str + functions: Dict[str, Any] diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py new file mode 100644 index 000000000..bcbc7a61c --- /dev/null +++ b/singlestoredb/apps/_python_udfs.py @@ -0,0 +1,85 @@ +import asyncio +import os +import typing + +from ..functions.ext.asgi import Application +from ._config import AppConfig +from ._connection_info import UdfConnectionInfo +from ._process import kill_process_by_port + +if typing.TYPE_CHECKING: + from ._uvicorn_util import AwaitableUvicornServer + +# Keep track of currently running server +_running_server: 'typing.Optional[AwaitableUvicornServer]' = None + + +async def run_udf_app( + replace_existing: bool, + log_level: str = 'error', + kill_existing_app_server: bool = True, +) -> UdfConnectionInfo: + global _running_server + from ._uvicorn_util import AwaitableUvicornServer + + try: + import uvicorn + except ImportError: + raise ImportError('package uvicorn is required to run python udfs') + + app_config = AppConfig.from_env() + + if kill_existing_app_server: + # Shutdown the server gracefully if it was started by us. + # Since the uvicorn server doesn't start a new subprocess + # killing the process would result in kernel dying. + if _running_server is not None: + await _running_server.shutdown() + _running_server = None + + # Kill if any other process is occupying the port + kill_process_by_port(app_config.listen_port) + + base_url = generate_base_url(app_config) + + udf_suffix = '' + if app_config.running_interactively: + udf_suffix = '_test' + app = Application(url=base_url, app_mode='managed', name_suffix=udf_suffix) + + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_level=log_level, + ) + _running_server = AwaitableUvicornServer(config) + + # Register the functions + app.register_functions(replace=replace_existing) + + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() + + print(f'Python UDF registered at {base_url}') + + return UdfConnectionInfo(base_url, app.get_function_info()) + + +def generate_base_url(app_config: AppConfig) -> str: + if not app_config.is_gateway_enabled: + raise RuntimeError('Python UDFs are not available if Nova Gateway is not enabled') + + if not app_config.running_interactively: + return app_config.base_url + + # generate python udf endpoint for interactive notebooks + gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT') + if app_config.is_local_dev: + gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT') + if gateway_url is None: + raise RuntimeError( + 'Missing SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT environment variable.', + ) + + return f'{gateway_url}/pythonudfs/{app_config.notebook_server_id}/interactive/' diff --git a/singlestoredb/config.py b/singlestoredb/config.py index de386ba03..61b79f298 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -317,7 +317,7 @@ 'external_function.app_mode', 'string', functools.partial( check_str, - valid_values=['remote', 'collocated'], + valid_values=['remote', 'collocated', 'managed'], ), 'remote', 'Specifies the mode of operation of the external function application.',