-
Notifications
You must be signed in to change notification settings - Fork 22
Add python UDF helpers #59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
491ca9d
719848f
1d61e10
b40fb15
af01a31
2875760
317b672
8670d04
988108b
04a985e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| from ._cloud_functions import run_function_app # noqa: F401 | ||
| from ._dashboards import run_dashboard_app # noqa: F401 | ||
| from ._python_udfs import run_udf_app # noqa: F401 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| from dataclasses import dataclass | ||
| from typing import Optional | ||
| from typing import Optional, Dict, Any | ||
|
|
||
|
|
||
| @dataclass | ||
|
|
@@ -8,3 +8,8 @@ class ConnectionInfo: | |
|
|
||
| # Only present in interactive mode | ||
| token: Optional[str] | ||
|
|
||
| @dataclass | ||
| class PythonUdfConnectionInfo: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove |
||
| url: str | ||
| functions: Dict[str, Any] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| import asyncio | ||
| import typing | ||
|
|
||
| from ._config import PythonUdfAppConfig | ||
| from ._connection_info import ConnectionInfo, PythonUdfConnectionInfo | ||
| from ._process import kill_process_by_port | ||
| from ..functions.ext.asgi import Application | ||
|
|
||
| if typing.TYPE_CHECKING: | ||
| from ._uvicorn_util import AwaitableUvicornServer | ||
|
|
||
| # Keep track of currently running server | ||
| _running_server: 'typing.Optional[AwaitableUvicornServer]' = None | ||
|
|
||
|
|
||
| async def run_udf_app( | ||
| replace_existing: bool, | ||
| log_level: str = 'error', | ||
| kill_existing_app_server: bool = True, | ||
| ) -> ConnectionInfo: | ||
| global _running_server | ||
| from ._uvicorn_util import AwaitableUvicornServer | ||
|
|
||
| try: | ||
| import uvicorn | ||
| except ImportError: | ||
| raise ImportError('package uvicorn is required to run python udfs') | ||
|
|
||
| app_config = PythonUdfAppConfig.from_env() | ||
|
|
||
| if kill_existing_app_server: | ||
| # Shutdown the server gracefully if it was started by us. | ||
| # Since the uvicorn server doesn't start a new subprocess | ||
| # killing the process would result in kernel dying. | ||
| if _running_server is not None: | ||
| await _running_server.shutdown() | ||
| _running_server = None | ||
|
|
||
| # Kill if any other process is occupying the port | ||
| kill_process_by_port(app_config.listen_port) | ||
|
|
||
| app = Application(url=app_config.base_url) | ||
|
|
||
| config = uvicorn.Config( | ||
| app, | ||
| host='0.0.0.0', | ||
| port=app_config.listen_port, | ||
| log_level=log_level, | ||
| ) | ||
| _running_server = AwaitableUvicornServer(config) | ||
|
|
||
| # Register the functions | ||
| app.register_functions(replace=replace_existing) | ||
|
|
||
| asyncio.create_task(_running_server.serve()) | ||
| await _running_server.wait_for_startup() | ||
|
|
||
| connection_info = PythonUdfConnectionInfo(app_config.base_url, app.get_function_info()) | ||
|
|
||
| return connection_info |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -310,16 +310,16 @@ | |
| register_option( | ||
| 'external_function.url', 'string', check_str, 'http://localhost:8000/invoke', | ||
| 'Specifies the URL of the external function application.', | ||
| environ=['SINGLESTOREDB_EXT_FUNC_URL'], | ||
| environ=['SINGLESTOREDB_EXT_FUNC_URL' ], | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stray space |
||
| ) | ||
|
|
||
| register_option( | ||
| 'external_function.app_mode', 'string', | ||
| functools.partial( | ||
| check_str, | ||
| valid_values=['remote', 'collocated'], | ||
| valid_values=['managed', 'remote', 'collocated'], | ||
| ), | ||
| 'remote', | ||
| 'managed', | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would leave the default as |
||
| 'Specifies the mode of operation of the external function application.', | ||
| environ=['SINGLESTOREDB_EXT_FUNC_APP_MODE'], | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1261,7 +1261,7 @@ def signature_to_sql( | |
| signature: Dict[str, Any], | ||
| url: Optional[str] = None, | ||
| data_format: str = 'rowdat_1', | ||
| app_mode: str = 'remote', | ||
| app_mode: str = 'managed', | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above. |
||
| link: Optional[str] = None, | ||
| replace: bool = False, | ||
| function_type: str = 'udf', | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think naming this
UDFAppConfigwould be better. Since there is no other option than Python.Udfis a bit too Java-esque. I preferUDF.