From 491ca9d39aafb4e09bc1027741dc59785533823b Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 18 Feb 2025 09:04:19 -0600 Subject: [PATCH 1/9] Add api for function signatures --- singlestoredb/functions/ext/asgi.py | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 702e3854b..c06ad7cfb 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -485,6 +485,7 @@ class Application(object): # Valid URL paths invoke_path = ('invoke',) show_create_function_path = ('show', 'create_function') + show_function_info = ('show', 'function_info') def __init__( self, @@ -688,6 +689,30 @@ async def __call__( await send(self.text_response_dict) + # Return function info + elif method == 'GET' and path == self.show_function_info: + functions = {} + + for key, (_, info) in self.endpoints.items(): + if not func_name or key == func_name: + sig = info['signature'] + args = [] + for a in sig.get('args', []): + args.append( + dict( + name=a['name'], + dtype=a['dtype'], + ), + ) + returns = dict( + dtype=sig['returns'].get('dtype'), + ) + functions[sig['name']] = dict(args=args, returns=returns) + + body = json.dumps(functions).encode('utf-8') + + await send(self.text_response_dict) + # Path not found else: body = b'' @@ -834,6 +859,11 @@ def drop_functions( for link in links: cur.execute(f'DROP LINK {link}') + @property + def function_info(self) -> None: + for k, v in self.endpoints.items(): + print(k, v) + async def call( self, name: str, From 719848f6b6501af4ac4b13b6d7887d8ba21caf2b Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 18 Feb 2025 13:09:48 -0600 Subject: [PATCH 2/9] Add get_function_info function --- singlestoredb/functions/ext/asgi.py | 72 +++++++++++++++++------------ singlestoredb/functions/ext/mmap.py | 2 +- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index c06ad7cfb..d4864dde3 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -485,7 +485,7 @@ class Application(object): # Valid URL paths invoke_path = ('invoke',) show_create_function_path = ('show', 'create_function') - show_function_info = ('show', 'function_info') + show_function_info_path = ('show', 'function_info') def __init__( self, @@ -690,27 +690,9 @@ async def __call__( await send(self.text_response_dict) # Return function info - elif method == 'GET' and path == self.show_function_info: - functions = {} - - for key, (_, info) in self.endpoints.items(): - if not func_name or key == func_name: - sig = info['signature'] - args = [] - for a in sig.get('args', []): - args.append( - dict( - name=a['name'], - dtype=a['dtype'], - ), - ) - returns = dict( - dtype=sig['returns'].get('dtype'), - ) - functions[sig['name']] = dict(args=args, returns=returns) - + elif method == 'GET' and path == self.show_function_info_path: + functions = self.get_function_info() body = json.dumps(functions).encode('utf-8') - await send(self.text_response_dict) # Path not found @@ -757,14 +739,49 @@ def _locate_app_functions(self, cur: Any) -> Tuple[Set[str], Set[str]]: # See if function URL matches url cur.execute(f'SHOW CREATE FUNCTION `{name}`') for fname, _, code, *_ in list(cur): - m = re.search(r" (?:\w+) SERVICE '([^']+)'", code) + m = re.search(r" (?:\w+) (?:SERVICE|MANAGED) '([^']+)'", code) if m and m.group(1) == self.url: funcs.add(fname) if link and re.match(r'^py_ext_func_link_\S{14}$', link): links.add(link) return funcs, links - def show_create_functions( + def get_function_info( + self, + func_name: Optional[str] = None, + ) -> Dict[str, Dict[str, Any]]: + """ + Return the functions and function signature information. + + Returns + ------- + Dict[str, Dict[str, Any]] + + """ + functions = {} + + for key, (_, info) in self.endpoints.items(): + if not func_name or key == func_name: + sig = info['signature'] + args = [] + for a in sig.get('args', []): + dtype = a['dtype'] + nullable = '?' in dtype + args.append( + dict( + name=a['name'], + dtype=dtype, + nullable=nullable, + ), + ) + returns = dict( + dtype=sig['returns'].get('dtype'), + ) + functions[sig['name']] = dict(args=args, returns=returns) + + return functions + + def get_create_functions( self, replace: bool = False, ) -> List[str]: @@ -832,7 +849,7 @@ def register_functions( cur.execute(f'DROP FUNCTION IF EXISTS `{fname}`') for link in links: cur.execute(f'DROP LINK {link}') - for func in self.show_create_functions(replace=replace): + for func in self.get_create_functions(replace=replace): cur.execute(func) def drop_functions( @@ -859,11 +876,6 @@ def drop_functions( for link in links: cur.execute(f'DROP LINK {link}') - @property - def function_info(self) -> None: - for k, v in self.endpoints.items(): - print(k, v) - async def call( self, name: str, @@ -1259,7 +1271,7 @@ def main(argv: Optional[List[str]] = None) -> None: app_mode='remote', ) - funcs = app.show_create_functions(replace=args.replace_existing) + funcs = app.get_create_functions(replace=args.replace_existing) if not funcs: raise RuntimeError('no functions specified') diff --git a/singlestoredb/functions/ext/mmap.py b/singlestoredb/functions/ext/mmap.py index 3bdb6a6f5..df200fa14 100644 --- a/singlestoredb/functions/ext/mmap.py +++ b/singlestoredb/functions/ext/mmap.py @@ -338,7 +338,7 @@ def main(argv: Optional[List[str]] = None) -> None: app_mode='collocated', ) - funcs = app.show_create_functions(replace=args.replace_existing) + funcs = app.get_create_functions(replace=args.replace_existing) if not funcs: raise RuntimeError('no functions specified') From 1d61e10d966bf1ecdd3cd7e0fd0063916dd8240b Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Fri, 21 Feb 2025 14:35:05 -0600 Subject: [PATCH 3/9] Add get_function_info and show/function_info endpoint --- singlestoredb/functions/ext/asgi.py | 42 +++++++++++++++++++++++----- singlestoredb/functions/signature.py | 29 +++++++++++++++++-- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index d4864dde3..2827ed109 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -692,7 +692,7 @@ async def __call__( # Return function info elif method == 'GET' and path == self.show_function_info_path: functions = self.get_function_info() - body = json.dumps(functions).encode('utf-8') + body = json.dumps(dict(functions=functions)).encode('utf-8') await send(self.text_response_dict) # Path not found @@ -749,34 +749,62 @@ def _locate_app_functions(self, cur: Any) -> Tuple[Set[str], Set[str]]: def get_function_info( self, func_name: Optional[str] = None, - ) -> Dict[str, Dict[str, Any]]: + ) -> Dict[str, Any]: """ Return the functions and function signature information. Returns ------- - Dict[str, Dict[str, Any]] + Dict[str, Any] """ + returns: Dict[str, Any] = {} functions = {} for key, (_, info) in self.endpoints.items(): if not func_name or key == func_name: sig = info['signature'] args = [] + + # Function arguments for a in sig.get('args', []): dtype = a['dtype'] nullable = '?' in dtype args.append( dict( name=a['name'], - dtype=dtype, + dtype=dtype.replace('?', ''), nullable=nullable, ), ) - returns = dict( - dtype=sig['returns'].get('dtype'), - ) + + # Record / table return types + if sig['returns']['dtype'].startswith('tuple['): + fields = [] + dtypes = sig['returns']['dtype'][6:-1].split(',') + field_names = sig['returns']['field_names'] + for i, dtype in enumerate(dtypes): + nullable = '?' in dtype + dtype = dtype.replace('?', '') + fields.append( + dict( + name=field_names[i], + dtype=dtype, + nullable=nullable, + ), + ) + returns = dict( + dtype='table' if info['function_type'] == 'tvf' else 'struct', + fields=fields, + ) + + # Atomic return types + else: + returns = dict( + dtype=sig['returns'].get('dtype').replace('?', ''), + nullable='?' in sig['returns'].get('dtype', ''), + ) + functions[sig['name']] = dict(args=args, returns=returns) return functions diff --git a/singlestoredb/functions/signature.py b/singlestoredb/functions/signature.py index 794f08f7d..1c1a0d4b9 100644 --- a/singlestoredb/functions/signature.py +++ b/singlestoredb/functions/signature.py @@ -494,6 +494,7 @@ def get_signature(func: Callable[..., Any], name: Optional[str] = None) -> Dict[ 'missing annotations for {} in {}' .format(', '.join(spec_diff), name), ) + elif isinstance(args_overrides, dict): for s in spec_diff: if s not in args_overrides: @@ -501,6 +502,7 @@ def get_signature(func: Callable[..., Any], name: Optional[str] = None) -> Dict[ 'missing annotations for {} in {}' .format(', '.join(spec_diff), name), ) + elif isinstance(args_overrides, list): if len(arg_names) != len(args_overrides): raise TypeError( @@ -509,6 +511,7 @@ def get_signature(func: Callable[..., Any], name: Optional[str] = None) -> Dict[ ) for i, arg in enumerate(arg_names): + if isinstance(args_overrides, list): sql = args_overrides[i] arg_type = sql_to_dtype(sql) @@ -535,6 +538,7 @@ def get_signature(func: Callable[..., Any], name: Optional[str] = None) -> Dict[ if isinstance(returns_overrides, str): sql = returns_overrides out_type = sql_to_dtype(sql) + elif isinstance(returns_overrides, list): if not output_fields: output_fields = [ @@ -547,29 +551,35 @@ def get_signature(func: Callable[..., Any], name: Optional[str] = None) -> Dict[ sql = dtype_to_sql( out_type, function_type=function_type, field_names=output_fields, ) + elif dataclasses.is_dataclass(returns_overrides): out_type = collapse_dtypes([ classify_dtype(x) for x in simplify_dtype([x.type for x in returns_overrides.fields]) ]) + output_fields = [x.name for x in returns_overrides.fields] sql = dtype_to_sql( out_type, function_type=function_type, - field_names=[x.name for x in returns_overrides.fields], + field_names=output_fields, ) + elif has_pydantic and inspect.isclass(returns_overrides) \ and issubclass(returns_overrides, pydantic.BaseModel): out_type = collapse_dtypes([ classify_dtype(x) for x in simplify_dtype([x for x in returns_overrides.model_fields.values()]) ]) + output_fields = [x for x in returns_overrides.model_fields.keys()] sql = dtype_to_sql( out_type, function_type=function_type, - field_names=[x for x in returns_overrides.model_fields.keys()], + field_names=output_fields, ) + elif returns_overrides is not None and not isinstance(returns_overrides, str): raise TypeError(f'unrecognized type for return value: {returns_overrides}') + else: if not output_fields: if dataclasses.is_dataclass(signature.return_annotation): @@ -579,13 +589,26 @@ def get_signature(func: Callable[..., Any], name: Optional[str] = None) -> Dict[ elif has_pydantic and inspect.isclass(signature.return_annotation) \ and issubclass(signature.return_annotation, pydantic.BaseModel): output_fields = list(signature.return_annotation.model_fields.keys()) + out_type = collapse_dtypes([ classify_dtype(x) for x in simplify_dtype(signature.return_annotation) ]) + + if not output_fields: + output_fields = [ + string.ascii_letters[i] for i in range(out_type.count(',')+1) + ] + sql = dtype_to_sql( out_type, function_type=function_type, field_names=output_fields, ) - out['returns'] = dict(dtype=out_type, sql=sql, default=None) + + out['returns'] = dict( + dtype=out_type, + sql=sql, + default=None, + field_names=output_fields, + ) copied_keys = ['database', 'environment', 'packages', 'resources', 'replace'] for key in copied_keys: From b40fb153ca7ed5a4631ff12970b952cf3a5a4cb7 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Fri, 21 Feb 2025 15:09:55 -0600 Subject: [PATCH 4/9] Add prefix / suffix for function names --- singlestoredb/config.py | 12 ++++++++++++ singlestoredb/functions/ext/asgi.py | 24 ++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/singlestoredb/config.py b/singlestoredb/config.py index d83b9931b..d69c4c345 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -407,6 +407,18 @@ environ=['SINGLESTOREDB_EXT_FUNC_LOG_LEVEL'], ) +register_option( + 'external_function.name_prefix', 'string', check_str, '', + 'Prefix to add to external function names.', + environ=['SINGLESTOREDB_EXT_FUNC_NAME_PREFIX'], +) + +register_option( + 'external_function.name_suffix', 'string', check_str, '', + 'Suffix to add to external function names.', + environ=['SINGLESTOREDB_EXT_FUNC_NAME_SUFFIX'], +) + register_option( 'external_function.connection', 'string', check_str, os.environ.get('SINGLESTOREDB_URL') or None, diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 2827ed109..ef14ff762 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -506,6 +506,8 @@ def __init__( link_name: Optional[str] = get_option('external_function.link_name'), link_config: Optional[Dict[str, Any]] = None, link_credentials: Optional[Dict[str, Any]] = None, + name_prefix: str = get_option('external_function.name_prefix'), + name_suffix: str = get_option('external_function.name_suffix'), ) -> None: if link_name and (link_config or link_credentials): raise ValueError( @@ -562,6 +564,7 @@ def __init__( if not hasattr(x, '_singlestoredb_attrs'): continue name = x._singlestoredb_attrs.get('name', x.__name__) + name = f'{name_prefix}{name}{name_suffix}' external_functions[x.__name__] = x func, info = make_func(name, x) endpoints[name.encode('utf-8')] = func, info @@ -577,6 +580,7 @@ def __init__( # Add endpoint for each exported function for name, alias in get_func_names(func_names): item = getattr(pkg, name) + alias = f'{name_prefix}{name}{name_suffix}' external_functions[name] = item func, info = make_func(alias, item) endpoints[alias.encode('utf-8')] = func, info @@ -589,12 +593,14 @@ def __init__( if not hasattr(x, '_singlestoredb_attrs'): continue name = x._singlestoredb_attrs.get('name', x.__name__) + name = f'{name_prefix}{name}{name_suffix}' external_functions[x.__name__] = x func, info = make_func(name, x) endpoints[name.encode('utf-8')] = func, info else: alias = funcs.__name__ + alias = f'{name_prefix}{alias}{name_suffix}' external_functions[funcs.__name__] = funcs func, info = make_func(alias, funcs) endpoints[alias.encode('utf-8')] = func, info @@ -1205,6 +1211,22 @@ def main(argv: Optional[List[str]] = None) -> None: ), help='logging level', ) + parser.add_argument( + '--name-prefix', metavar='name_prefix', + default=defaults.get( + 'name_prefix', + get_option('external_function.name_prefix'), + ), + help='Prefix to add to function names', + ) + parser.add_argument( + '--name-suffix', metavar='name_suffix', + default=defaults.get( + 'name_suffix', + get_option('external_function.name_suffix'), + ), + help='Suffix to add to function names', + ) parser.add_argument( 'functions', metavar='module.or.func.path', nargs='*', help='functions or modules to export in UDF server', @@ -1297,6 +1319,8 @@ def main(argv: Optional[List[str]] = None) -> None: link_config=json.loads(args.link_config) or None, link_credentials=json.loads(args.link_credentials) or None, app_mode='remote', + name_prefix=args.name_prefix, + name_suffix=args.name_suffix, ) funcs = app.get_create_functions(replace=args.replace_existing) From af01a31f23b2e65d3749119c00cfdfbbe13c2c15 Mon Sep 17 00:00:00 2001 From: psachdeva Date: Tue, 11 Mar 2025 20:58:09 +0000 Subject: [PATCH 5/9] Add support for Managed Service --- singlestoredb/apps/__init__.py | 1 + singlestoredb/apps/_python_udfs.py | 63 ++++++++++++++++++++++++++++ singlestoredb/config.py | 4 +- singlestoredb/functions/ext/asgi.py | 2 +- singlestoredb/functions/signature.py | 2 +- 5 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 singlestoredb/apps/_python_udfs.py diff --git a/singlestoredb/apps/__init__.py b/singlestoredb/apps/__init__.py index b9f54898f..a07f228cb 100644 --- a/singlestoredb/apps/__init__.py +++ b/singlestoredb/apps/__init__.py @@ -1,2 +1,3 @@ from ._cloud_functions import run_function_app # noqa: F401 from ._dashboards import run_dashboard_app # noqa: F401 +from ._python_udfs import run_udf_app # noqa: F401 diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py new file mode 100644 index 000000000..e0ae222ee --- /dev/null +++ b/singlestoredb/apps/_python_udfs.py @@ -0,0 +1,63 @@ +import asyncio +import textwrap +import typing + +from ._config import AppConfig +from ._connection_info import ConnectionInfo +from ._process import kill_process_by_port +from ..functions.ext.asgi import Application + +if typing.TYPE_CHECKING: + from ._uvicorn_util import AwaitableUvicornServer + +# Keep track of currently running server +_running_server: 'typing.Optional[AwaitableUvicornServer]' = None + + +async def run_udf_app( + app: Application, + log_level: str = 'error', + kill_existing_app_server: bool = True, +) -> ConnectionInfo: + global _running_server + from ._uvicorn_util import AwaitableUvicornServer + + try: + import uvicorn + except ImportError: + raise ImportError('package uvicorn is required to run python udfs') + + app_config = AppConfig.from_env() + + if kill_existing_app_server: + # Shutdown the server gracefully if it was started by us. + # Since the uvicorn server doesn't start a new subprocess + # killing the process would result in kernel dying. + if _running_server is not None: + await _running_server.shutdown() + _running_server = None + + # Kill if any other process is occupying the port + kill_process_by_port(app_config.listen_port) + + app.root_path = app_config.base_path + + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_level=log_level, + ) + _running_server = AwaitableUvicornServer(config) + + app.register_functions(replace=True) + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() + + connection_info = ConnectionInfo(app_config.base_url, app_config.token) + + print( + 'Following Python UDFs are available: ', app.get_function_info() + ) + + return connection_info diff --git a/singlestoredb/config.py b/singlestoredb/config.py index d69c4c345..95ed1be34 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -317,9 +317,9 @@ 'external_function.app_mode', 'string', functools.partial( check_str, - valid_values=['remote', 'collocated'], + valid_values=['managed', 'remote', 'collocated'], ), - 'remote', + 'managed', 'Specifies the mode of operation of the external function application.', environ=['SINGLESTOREDB_EXT_FUNC_APP_MODE'], ) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index ef14ff762..6d3f38a49 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1318,7 +1318,7 @@ def main(argv: Optional[List[str]] = None) -> None: link_name=args.link_name or None, link_config=json.loads(args.link_config) or None, link_credentials=json.loads(args.link_credentials) or None, - app_mode='remote', + app_mode='managed', name_prefix=args.name_prefix, name_suffix=args.name_suffix, ) diff --git a/singlestoredb/functions/signature.py b/singlestoredb/functions/signature.py index 1c1a0d4b9..1b74db169 100644 --- a/singlestoredb/functions/signature.py +++ b/singlestoredb/functions/signature.py @@ -734,7 +734,7 @@ def signature_to_sql( signature: Dict[str, Any], url: Optional[str] = None, data_format: str = 'rowdat_1', - app_mode: str = 'remote', + app_mode: str = 'managed', link: Optional[str] = None, replace: bool = False, ) -> str: From 2875760a0bd094c2c5facaa47422aeff53a6b75d Mon Sep 17 00:00:00 2001 From: psachdeva Date: Thu, 13 Mar 2025 21:16:37 +0000 Subject: [PATCH 6/9] wip --- singlestoredb/apps/_python_udfs.py | 4 +++- singlestoredb/config.py | 2 +- singlestoredb/functions/ext/asgi.py | 15 ++++++++++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index e0ae222ee..fb6b8fcd5 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -17,7 +17,7 @@ async def run_udf_app( app: Application, log_level: str = 'error', - kill_existing_app_server: bool = True, + kill_existing_app_server: bool = False, ) -> ConnectionInfo: global _running_server from ._uvicorn_util import AwaitableUvicornServer @@ -42,6 +42,8 @@ async def run_udf_app( app.root_path = app_config.base_path + print("Listening on port", app_config.listen_port) + config = uvicorn.Config( app, host='0.0.0.0', diff --git a/singlestoredb/config.py b/singlestoredb/config.py index 95ed1be34..f2b5dccfb 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -310,7 +310,7 @@ register_option( 'external_function.url', 'string', check_str, 'http://localhost:8000/invoke', 'Specifies the URL of the external function application.', - environ=['SINGLESTOREDB_EXT_FUNC_URL'], + environ=['SINGLESTOREDB_APP_BASE_URL'], ) register_option( diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 6d3f38a49..325594b7e 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -500,7 +500,7 @@ def __init__( ] ] = None, app_mode: str = get_option('external_function.app_mode'), - url: str = get_option('external_function.url'), + url: str = get_option('external_function.url') + "invoke", data_format: str = get_option('external_function.data_format'), data_version: str = get_option('external_function.data_version'), link_name: Optional[str] = get_option('external_function.link_name'), @@ -652,6 +652,18 @@ async def __call__( func_info: Dict[str, Any] = {} if func_endpoint is not None: func, func_info = func_endpoint + + if method == 'GET' and (path == () or path == ""): + await send({ + 'type': 'http.response.start', + 'status': 200, + 'headers': [(b'content-type', b'text/plain')], + }) + await send({ + 'type': 'http.response.body', + 'body': b'Server is alive!', + }) + return # Call the endpoint if method == 'POST' and func is not None and path == self.invoke_path: @@ -884,6 +896,7 @@ def register_functions( for link in links: cur.execute(f'DROP LINK {link}') for func in self.get_create_functions(replace=replace): + print("FUNC: ", func) cur.execute(func) def drop_functions( From 317b672fc2ae54c5ec5cebfb91f566701b3073dc Mon Sep 17 00:00:00 2001 From: psachdeva Date: Mon, 31 Mar 2025 13:36:17 +0000 Subject: [PATCH 7/9] Misc changes --- singlestoredb/apps/_config.py | 49 ++++++++++++++++++++++++++ singlestoredb/apps/_connection_info.py | 7 +++- singlestoredb/apps/_python_udfs.py | 21 ++++++----- singlestoredb/functions/ext/asgi.py | 2 +- 4 files changed, 66 insertions(+), 13 deletions(-) diff --git a/singlestoredb/apps/_config.py b/singlestoredb/apps/_config.py index 0bfb19c69..77e245c98 100644 --- a/singlestoredb/apps/_config.py +++ b/singlestoredb/apps/_config.py @@ -64,3 +64,52 @@ def token(self) -> Optional[str]: return self.app_token else: return self.user_token + + +@dataclass +class PythonUdfAppConfig: + listen_port: int + base_url: str + base_path: str + running_interactively: bool + is_gateway_enabled: bool + + @staticmethod + def _read_variable(name: str) -> str: + value = os.environ.get(name) + if value is None: + raise RuntimeError( + f'Missing {name} environment variable. ' + 'Is the code running outside SingleStoreDB notebook environment?', + ) + return value + + @classmethod + def from_env(cls) -> 'AppConfig': + port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT') + base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL') + base_path = cls._read_variable('SINGLESTOREDB_APP_BASE_PATH') + + workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE') + running_interactively = workload_type == 'InteractiveNotebook' + + is_gateway_enabled = 'SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT' in os.environ + + if running_interactively: + if is_gateway_enabled: + base_url = cls._read_variable('SINGLESTOREDB_PYTHON_UDF_BASE_URL') + base_path = cls._read_variable('SINGLESTOREDB_PYTHON_UDF_BASE_PATH') + assert base_url is not None + assert base_path is not None + else: + raise RuntimeError( + 'Running Python UDFs in interactive mode without nova-gateway enabled is not supported' + ) + + return cls( + listen_port=int(port), + base_url=base_url, + base_path=base_path, + running_interactively=running_interactively, + is_gateway_enabled=is_gateway_enabled, + ) diff --git a/singlestoredb/apps/_connection_info.py b/singlestoredb/apps/_connection_info.py index 32fe6ea2d..f5ff2df32 100644 --- a/singlestoredb/apps/_connection_info.py +++ b/singlestoredb/apps/_connection_info.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Dict, Any @dataclass @@ -8,3 +8,8 @@ class ConnectionInfo: # Only present in interactive mode token: Optional[str] + +@dataclass +class PythonUdfConnectionInfo: + url: str + functions: Dict[str, Any] diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index fb6b8fcd5..fb518356a 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -1,9 +1,10 @@ import asyncio import textwrap import typing +import os -from ._config import AppConfig -from ._connection_info import ConnectionInfo +from ._config import PythonUdfAppConfig +from ._connection_info import ConnectionInfo, PythonUdfConnectionInfo from ._process import kill_process_by_port from ..functions.ext.asgi import Application @@ -15,9 +16,8 @@ async def run_udf_app( - app: Application, log_level: str = 'error', - kill_existing_app_server: bool = False, + kill_existing_app_server: bool = True, ) -> ConnectionInfo: global _running_server from ._uvicorn_util import AwaitableUvicornServer @@ -27,7 +27,7 @@ async def run_udf_app( except ImportError: raise ImportError('package uvicorn is required to run python udfs') - app_config = AppConfig.from_env() + app_config = PythonUdfAppConfig.from_env() if kill_existing_app_server: # Shutdown the server gracefully if it was started by us. @@ -40,10 +40,9 @@ async def run_udf_app( # Kill if any other process is occupying the port kill_process_by_port(app_config.listen_port) + app = Application() app.root_path = app_config.base_path - print("Listening on port", app_config.listen_port) - config = uvicorn.Config( app, host='0.0.0.0', @@ -52,14 +51,14 @@ async def run_udf_app( ) _running_server = AwaitableUvicornServer(config) + # In interactive mode this should be set to true + replace = app_config.running_interactively app.register_functions(replace=True) + asyncio.create_task(_running_server.serve()) await _running_server.wait_for_startup() - connection_info = ConnectionInfo(app_config.base_url, app_config.token) + connection_info = PythonUdfConnectionInfo(app_config.base_url, app.get_function_info()) - print( - 'Following Python UDFs are available: ', app.get_function_info() - ) return connection_info diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 325594b7e..4a80136c1 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -500,7 +500,7 @@ def __init__( ] ] = None, app_mode: str = get_option('external_function.app_mode'), - url: str = get_option('external_function.url') + "invoke", + url: str = get_option('external_function.url'), data_format: str = get_option('external_function.data_format'), data_version: str = get_option('external_function.data_version'), link_name: Optional[str] = get_option('external_function.link_name'), From 8670d04c0e952e18ba141026086261418dcc548a Mon Sep 17 00:00:00 2001 From: psachdeva Date: Mon, 31 Mar 2025 17:11:48 +0000 Subject: [PATCH 8/9] Fix url --- singlestoredb/apps/_config.py | 2 -- singlestoredb/apps/_python_udfs.py | 9 +++------ singlestoredb/functions/ext/asgi.py | 15 ++------------- 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/singlestoredb/apps/_config.py b/singlestoredb/apps/_config.py index 77e245c98..80381d6b1 100644 --- a/singlestoredb/apps/_config.py +++ b/singlestoredb/apps/_config.py @@ -99,8 +99,6 @@ def from_env(cls) -> 'AppConfig': if is_gateway_enabled: base_url = cls._read_variable('SINGLESTOREDB_PYTHON_UDF_BASE_URL') base_path = cls._read_variable('SINGLESTOREDB_PYTHON_UDF_BASE_PATH') - assert base_url is not None - assert base_path is not None else: raise RuntimeError( 'Running Python UDFs in interactive mode without nova-gateway enabled is not supported' diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index fb518356a..c337c8786 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -1,7 +1,5 @@ import asyncio -import textwrap import typing -import os from ._config import PythonUdfAppConfig from ._connection_info import ConnectionInfo, PythonUdfConnectionInfo @@ -16,6 +14,7 @@ async def run_udf_app( + replace_existing: bool, log_level: str = 'error', kill_existing_app_server: bool = True, ) -> ConnectionInfo: @@ -40,7 +39,7 @@ async def run_udf_app( # Kill if any other process is occupying the port kill_process_by_port(app_config.listen_port) - app = Application() + app = Application(url=app_config.base_url) app.root_path = app_config.base_path config = uvicorn.Config( @@ -51,9 +50,7 @@ async def run_udf_app( ) _running_server = AwaitableUvicornServer(config) - # In interactive mode this should be set to true - replace = app_config.running_interactively - app.register_functions(replace=True) + app.register_functions(replace=replace_existing) asyncio.create_task(_running_server.serve()) await _running_server.wait_for_startup() diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 4a80136c1..a83fb1689 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -653,18 +653,6 @@ async def __call__( if func_endpoint is not None: func, func_info = func_endpoint - if method == 'GET' and (path == () or path == ""): - await send({ - 'type': 'http.response.start', - 'status': 200, - 'headers': [(b'content-type', b'text/plain')], - }) - await send({ - 'type': 'http.response.body', - 'body': b'Server is alive!', - }) - return - # Call the endpoint if method == 'POST' and func is not None and path == self.invoke_path: data_format = func_info['data_format'] @@ -708,7 +696,7 @@ async def __call__( await send(self.text_response_dict) # Return function info - elif method == 'GET' and path == self.show_function_info_path: + elif method == 'GET' and (path == "" or path == ()): functions = self.get_function_info() body = json.dumps(dict(functions=functions)).encode('utf-8') await send(self.text_response_dict) @@ -750,6 +738,7 @@ def _locate_app_functions(self, cur: Any) -> Tuple[Set[str], Set[str]]: """Locate all current functions and links belonging to this app.""" funcs, links = set(), set() cur.execute('SHOW FUNCTIONS') + print("List Cur", list(cur)) for name, ftype, _, _, _, link in list(cur): # Only look at external functions if 'external' not in ftype.lower(): From 04a985ea735bd550b857d5c633b8c19c446a3a23 Mon Sep 17 00:00:00 2001 From: Prabhat Sachdeva Date: Tue, 22 Apr 2025 19:17:38 +0530 Subject: [PATCH 9/9] Use SINGLESTOREDB_NOVA_GATEWAY_DEV_BASE_URL --- singlestoredb/apps/_config.py | 6 +----- singlestoredb/apps/_python_udfs.py | 5 ++--- singlestoredb/config.py | 2 +- singlestoredb/functions/ext/asgi.py | 3 +-- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/singlestoredb/apps/_config.py b/singlestoredb/apps/_config.py index 80381d6b1..d3c5e3d08 100644 --- a/singlestoredb/apps/_config.py +++ b/singlestoredb/apps/_config.py @@ -70,7 +70,6 @@ def token(self) -> Optional[str]: class PythonUdfAppConfig: listen_port: int base_url: str - base_path: str running_interactively: bool is_gateway_enabled: bool @@ -88,7 +87,6 @@ def _read_variable(name: str) -> str: def from_env(cls) -> 'AppConfig': port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT') base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL') - base_path = cls._read_variable('SINGLESTOREDB_APP_BASE_PATH') workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE') running_interactively = workload_type == 'InteractiveNotebook' @@ -97,8 +95,7 @@ def from_env(cls) -> 'AppConfig': if running_interactively: if is_gateway_enabled: - base_url = cls._read_variable('SINGLESTOREDB_PYTHON_UDF_BASE_URL') - base_path = cls._read_variable('SINGLESTOREDB_PYTHON_UDF_BASE_PATH') + base_url = cls._read_variable('SINGLESTOREDB_NOVA_GATEWAY_DEV_BASE_URL') else: raise RuntimeError( 'Running Python UDFs in interactive mode without nova-gateway enabled is not supported' @@ -107,7 +104,6 @@ def from_env(cls) -> 'AppConfig': return cls( listen_port=int(port), base_url=base_url, - base_path=base_path, running_interactively=running_interactively, is_gateway_enabled=is_gateway_enabled, ) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index c337c8786..f8acba69f 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -39,8 +39,7 @@ async def run_udf_app( # Kill if any other process is occupying the port kill_process_by_port(app_config.listen_port) - app = Application(url=app_config.base_url) - app.root_path = app_config.base_path + app = Application(url=app_config.base_url) config = uvicorn.Config( app, @@ -50,6 +49,7 @@ async def run_udf_app( ) _running_server = AwaitableUvicornServer(config) + # Register the functions app.register_functions(replace=replace_existing) asyncio.create_task(_running_server.serve()) @@ -57,5 +57,4 @@ async def run_udf_app( connection_info = PythonUdfConnectionInfo(app_config.base_url, app.get_function_info()) - return connection_info diff --git a/singlestoredb/config.py b/singlestoredb/config.py index 01c1e37f8..013592e15 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -310,7 +310,7 @@ register_option( 'external_function.url', 'string', check_str, 'http://localhost:8000/invoke', 'Specifies the URL of the external function application.', - environ=['SINGLESTOREDB_APP_BASE_URL'], + environ=['SINGLESTOREDB_EXT_FUNC_URL' ], ) register_option( diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 4adc5213f..0a5780faa 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -649,7 +649,6 @@ def __init__( else: alias = funcs.__name__ - alias = f'{name_prefix}{alias}{name_suffix}' external_functions[funcs.__name__] = funcs alias = f'{name_prefix}{alias}{name_suffix}' func, info = make_func(alias, funcs) @@ -702,7 +701,7 @@ async def __call__( func_info: Dict[str, Any] = {} if func_endpoint is not None: func, func_info = func_endpoint - + # Call the endpoint if method == 'POST' and func is not None and path == self.invoke_path: args_data_format = func_info['args_data_format']