diff --git a/singlestoredb/apps/__init__.py b/singlestoredb/apps/__init__.py index b9f54898f..a07f228cb 100644 --- a/singlestoredb/apps/__init__.py +++ b/singlestoredb/apps/__init__.py @@ -1,2 +1,3 @@ from ._cloud_functions import run_function_app # noqa: F401 from ._dashboards import run_dashboard_app # noqa: F401 +from ._python_udfs import run_udf_app # noqa: F401 diff --git a/singlestoredb/apps/_config.py b/singlestoredb/apps/_config.py index 0bfb19c69..d3c5e3d08 100644 --- a/singlestoredb/apps/_config.py +++ b/singlestoredb/apps/_config.py @@ -64,3 +64,46 @@ def token(self) -> Optional[str]: return self.app_token else: return self.user_token + + +@dataclass +class PythonUdfAppConfig: + listen_port: int + base_url: str + running_interactively: bool + is_gateway_enabled: bool + + @staticmethod + def _read_variable(name: str) -> str: + value = os.environ.get(name) + if value is None: + raise RuntimeError( + f'Missing {name} environment variable. ' + 'Is the code running outside SingleStoreDB notebook environment?', + ) + return value + + @classmethod + def from_env(cls) -> 'AppConfig': + port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT') + base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL') + + workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE') + running_interactively = workload_type == 'InteractiveNotebook' + + is_gateway_enabled = 'SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT' in os.environ + + if running_interactively: + if is_gateway_enabled: + base_url = cls._read_variable('SINGLESTOREDB_NOVA_GATEWAY_DEV_BASE_URL') + else: + raise RuntimeError( + 'Running Python UDFs in interactive mode without nova-gateway enabled is not supported' + ) + + return cls( + listen_port=int(port), + base_url=base_url, + running_interactively=running_interactively, + is_gateway_enabled=is_gateway_enabled, + ) diff --git a/singlestoredb/apps/_connection_info.py b/singlestoredb/apps/_connection_info.py index 32fe6ea2d..f5ff2df32 100644 --- a/singlestoredb/apps/_connection_info.py +++ b/singlestoredb/apps/_connection_info.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Dict, Any @dataclass @@ -8,3 +8,8 @@ class ConnectionInfo: # Only present in interactive mode token: Optional[str] + +@dataclass +class PythonUdfConnectionInfo: + url: str + functions: Dict[str, Any] diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py new file mode 100644 index 000000000..f8acba69f --- /dev/null +++ b/singlestoredb/apps/_python_udfs.py @@ -0,0 +1,60 @@ +import asyncio +import typing + +from ._config import PythonUdfAppConfig +from ._connection_info import ConnectionInfo, PythonUdfConnectionInfo +from ._process import kill_process_by_port +from ..functions.ext.asgi import Application + +if typing.TYPE_CHECKING: + from ._uvicorn_util import AwaitableUvicornServer + +# Keep track of currently running server +_running_server: 'typing.Optional[AwaitableUvicornServer]' = None + + +async def run_udf_app( + replace_existing: bool, + log_level: str = 'error', + kill_existing_app_server: bool = True, +) -> ConnectionInfo: + global _running_server + from ._uvicorn_util import AwaitableUvicornServer + + try: + import uvicorn + except ImportError: + raise ImportError('package uvicorn is required to run python udfs') + + app_config = PythonUdfAppConfig.from_env() + + if kill_existing_app_server: + # Shutdown the server gracefully if it was started by us. + # Since the uvicorn server doesn't start a new subprocess + # killing the process would result in kernel dying. + if _running_server is not None: + await _running_server.shutdown() + _running_server = None + + # Kill if any other process is occupying the port + kill_process_by_port(app_config.listen_port) + + app = Application(url=app_config.base_url) + + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_level=log_level, + ) + _running_server = AwaitableUvicornServer(config) + + # Register the functions + app.register_functions(replace=replace_existing) + + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() + + connection_info = PythonUdfConnectionInfo(app_config.base_url, app.get_function_info()) + + return connection_info diff --git a/singlestoredb/config.py b/singlestoredb/config.py index de386ba03..013592e15 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -310,16 +310,16 @@ register_option( 'external_function.url', 'string', check_str, 'http://localhost:8000/invoke', 'Specifies the URL of the external function application.', - environ=['SINGLESTOREDB_EXT_FUNC_URL'], + environ=['SINGLESTOREDB_EXT_FUNC_URL' ], ) register_option( 'external_function.app_mode', 'string', functools.partial( check_str, - valid_values=['remote', 'collocated'], + valid_values=['managed', 'remote', 'collocated'], ), - 'remote', + 'managed', 'Specifies the mode of operation of the external function application.', environ=['SINGLESTOREDB_EXT_FUNC_APP_MODE'], ) diff --git a/singlestoredb/functions/signature.py b/singlestoredb/functions/signature.py index b36e404eb..54ce5f6f7 100644 --- a/singlestoredb/functions/signature.py +++ b/singlestoredb/functions/signature.py @@ -1261,7 +1261,7 @@ def signature_to_sql( signature: Dict[str, Any], url: Optional[str] = None, data_format: str = 'rowdat_1', - app_mode: str = 'remote', + app_mode: str = 'managed', link: Optional[str] = None, replace: bool = False, function_type: str = 'udf',