diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4f73c3618..ebf27708c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,18 +1,46 @@ # SingleStore Python SDK Contributing Guide Fork this repo and commit your changes to the forked repo. -From there make a Pull Request with your submission keeping the following in mind: +From there make a Pull Request with your submission keeping the +following in mind: ## Pre-commit checks on the clone of this repo -The CI pipeline in this repo runs a bunch of validation checks and code reformatting with pre-commit checks. If you don't install those checks in your clone of the repo, the code will likely not pass. To install the pre-commit tool in your clone run the following from your clone directory. This will force the checks before you can push. +The CI pipeline in this repo runs a bunch of validation checks and code +reformatting with pre-commit checks. If you don't install those checks +in your clone of the repo, the code will likely not pass. To install +the pre-commit tool in your clone run the following from your clone +directory. This will force the checks before you can push. -```bash -pip3 install pre-commit==3.7.1 +``` +pip install pre-commit==3.7.1 pre-commit install ``` -The checks run automatically when you attempt to commit, but you can run them manually as well with the following: -```bash +The checks run automatically when you attempt to commit, but you can run +them manually as well with the following: +``` pre-commit run --all-files ``` + +## Running tests + +To create a test environment, do the following: +``` +pip install -r requirements.txt +pip install -r test-requirements.txt +``` + +If you have Docker installed, you can run the tests as follows. Note that +you should run the tests using both standard protocol and Data API (HTTP): +``` +pytest -v singlestoredb/tests +USE_DATA_API=1 -v singlestoredb/tests +``` + +If you need to run against a specific server version, you can specify +the URL of that server: +``` +SINGLESTOREDB_URL=user:pw@127.0.0.1:3306 pytest -v singlestoredb/tests +SINGLESTOREDB_URL=http://user:pw@127.0.0.1:8090 pytest -v singlestoredb/tests +``` diff --git a/RELEASE.md b/RELEASE.md index 932fd07c6..f029053e5 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,24 +1,49 @@ # Release process -1. Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using - semantic versioning rules: minor bump for new features, patch bump for - bug fixes. +## Bump the package version and build documentation -2. Add release notes to `docs/src/whatsnew.rst`. +Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using +semantic versioning rules: minor bump for new features, patch bump for +bug fixes. Add release notes to `docs/src/whatsnew.rst`. Run `make html` in +`docs/src` to generate documentation. -3. Run `SINGLESTOREDB_URL=root:@db-server:db-port make html` in `docs/src` to - generate documentation. You will need `sphinx` and `sphinx_rtd_theme` installed - for this step. You also need a SingleStoreDB server running at the given - IP and port to run samples against. +You will need `sphinx` and `sphinx_rtd_theme` installed for this step. You +also need a SingleStoreDB server running at the given IP and port to run +samples against. -4. Commit all changed files with a commit like "Prepare for vX.X.X release". +There is a utility to do this process for you, but you should check the +`docs/src/whatsnew.rst` to verify the release summary. Use the following +to run it: +``` +resources/bump_version.py < major | minor | patch > -5. The coverage tests will be triggered by the push, but you should also run - `Smoke test` workflow manually which does basic tests on all supported versions - of Python. +``` -6. Once all workflows are clean, create a new Github release with the name - "SingleStoreDB vX.X.X" and set the generated tag to the matching version - number. Add the release notes from the `whatsnew.rst` file to the release - notes. Creating the release will run the `Public packages` workflow which - builds the packages and pubsishes them to PyPI. +## Commit and push the changes + +After verifying the release summary in the documentation, commit the changes: +``` +# Make sure newly generated docs get added +git add docs + +# Commit changes +git commit -am "Prepare for vX.X.X release". + +git push + +``` + +## Run smoke tests + +The coverage tests will be triggered by the push, but you should also run +[Smoke test](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/smoke-test.yml) +workflow manually which does basic tests on all supported versions of Python. + +## Create the release on Github + +Once all workflows are clean, create a new Github release with the name +"SingleStoreDB vX.X.X" at +and set the generated tag to the matching version +number. Add the release notes from the `whatsnew.rst` file to the release +notes. Creating the release will run the [Publish packages](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/publish.yml) +workflow which builds the packages and pubsishes them to PyPI. diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index b94a4fdec..e2a147314 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -47,7 +47,12 @@ async def run_udf_app( udf_suffix = '' if app_config.running_interactively: udf_suffix = '_test' - app = Application(url=base_url, app_mode='managed', name_suffix=udf_suffix) + app = Application( + url=base_url, + app_mode='managed', + name_suffix=udf_suffix, + log_level=log_level, + ) if not app.endpoints: raise ValueError('You must define at least one function.') @@ -60,7 +65,7 @@ async def run_udf_app( app, host='0.0.0.0', port=app_config.listen_port, - log_level=log_level, + log_config=app.get_uvicorn_log_config(), ) _running_server = AwaitableUvicornServer(config) diff --git a/singlestoredb/config.py b/singlestoredb/config.py index 86d8a8b94..71c751738 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -407,6 +407,12 @@ environ=['SINGLESTOREDB_EXT_FUNC_LOG_LEVEL'], ) +register_option( + 'external_function.log_file', 'string', check_str, None, + 'File path to write logs to instead of console.', + environ=['SINGLESTOREDB_EXT_FUNC_LOG_FILE'], +) + register_option( 'external_function.name_prefix', 'string', check_str, '', 'Prefix to add to external function names.', @@ -450,6 +456,18 @@ environ=['SINGLESTOREDB_EXT_FUNC_TIMEOUT'], ) +register_option( + 'external_function.disable_metrics', 'bool', check_bool, False, + 'Disable logging of function call metrics.', + environ=['SINGLESTOREDB_EXT_FUNC_DISABLE_METRICS'], +) + +register_option( + 'external_function.app_name', 'string', check_str, None, + 'Name for the external function application instance.', + environ=['SINGLESTOREDB_EXT_FUNC_APP_NAME'], +) + # # Debugging options # diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index a2910ac69..973bfbcf8 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -91,7 +91,6 @@ logger = utils.get_logger('singlestoredb.functions.ext.asgi') - # If a number of processes is specified, create a pool of workers num_processes = max(0, int(os.environ.get('SINGLESTOREDB_EXT_NUM_PROCESSES', 0))) if num_processes > 1: @@ -678,8 +677,24 @@ class Application(object): link_credentials : Dict[str, Any], optional The CREDENTIALS section of a LINK definition. This dictionary gets converted to JSON for the CREATE LINK call. + name_prefix : str, optional + Prefix to add to function names when registering with the database + name_suffix : str, optional + Suffix to add to function names when registering with the database function_database : str, optional The database to use for external function definitions. + log_file : str, optional + File path to write logs to instead of console. If None, logs are + written to console. When specified, application logger handlers + are replaced with a file handler. + log_level : str, optional + Logging level for the application logger. Valid values are 'info', + 'debug', 'warning', 'error'. Defaults to 'info'. + disable_metrics : bool, optional + Disable logging of function call metrics. Defaults to False. + app_name : str, optional + Name for the application instance. Used to create a logger-specific + name. If not provided, a random name will be generated. """ @@ -846,6 +861,10 @@ def __init__( name_prefix: str = get_option('external_function.name_prefix'), name_suffix: str = get_option('external_function.name_suffix'), function_database: Optional[str] = None, + log_file: Optional[str] = get_option('external_function.log_file'), + log_level: str = get_option('external_function.log_level'), + disable_metrics: bool = get_option('external_function.disable_metrics'), + app_name: Optional[str] = get_option('external_function.app_name'), ) -> None: if link_name and (link_config or link_credentials): raise ValueError( @@ -862,6 +881,15 @@ def __init__( get_option('external_function.link_credentials') or '{}', ) or None + # Generate application name if not provided + if app_name is None: + app_name = f'udf_app_{secrets.token_hex(4)}' + + self.name = app_name + + # Create logger instance specific to this application + self.logger = utils.get_logger(f'singlestoredb.functions.ext.asgi.{self.name}') + # List of functions specs specs: List[Union[str, Callable[..., Any], ModuleType]] = [] @@ -953,6 +981,97 @@ def __init__( self.endpoints = endpoints self.external_functions = external_functions self.function_database = function_database + self.log_file = log_file + self.log_level = log_level + self.disable_metrics = disable_metrics + + # Configure logging + self._configure_logging() + + def _configure_logging(self) -> None: + """Configure logging based on the log_file settings.""" + # Set logger level + self.logger.setLevel(getattr(logging, self.log_level.upper())) + + # Remove all existing handlers to ensure clean configuration + self.logger.handlers.clear() + + # Configure log file if specified + if self.log_file: + # Create file handler + file_handler = logging.FileHandler(self.log_file) + file_handler.setLevel(getattr(logging, self.log_level.upper())) + + # Use JSON formatter for file logging + formatter = utils.JSONFormatter() + file_handler.setFormatter(formatter) + + # Add the handler to the logger + self.logger.addHandler(file_handler) + else: + # For console logging, create a new stream handler with JSON formatter + console_handler = logging.StreamHandler() + console_handler.setLevel(getattr(logging, self.log_level.upper())) + console_handler.setFormatter(utils.JSONFormatter()) + self.logger.addHandler(console_handler) + + # Prevent propagation to avoid duplicate or differently formatted messages + self.logger.propagate = False + + def get_uvicorn_log_config(self) -> Dict[str, Any]: + """ + Create uvicorn log config that matches the Application's logging format. + + This method returns the log configuration used by uvicorn, allowing external + users to match the logging format of the Application class. + + Returns + ------- + Dict[str, Any] + Log configuration dictionary compatible with uvicorn's log_config parameter + + """ + log_config = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'json': { + '()': 'singlestoredb.functions.ext.utils.JSONFormatter', + }, + }, + 'handlers': { + 'default': { + 'class': ( + 'logging.FileHandler' if self.log_file + else 'logging.StreamHandler' + ), + 'formatter': 'json', + }, + }, + 'loggers': { + 'uvicorn': { + 'handlers': ['default'], + 'level': self.log_level.upper(), + 'propagate': False, + }, + 'uvicorn.error': { + 'handlers': ['default'], + 'level': self.log_level.upper(), + 'propagate': False, + }, + 'uvicorn.access': { + 'handlers': ['default'], + 'level': self.log_level.upper(), + 'propagate': False, + }, + }, + } + + # Add filename to file handler if log file is specified + if self.log_file: + log_config['handlers']['default']['filename'] = self.log_file # type: ignore + + return log_config async def __call__( self, @@ -976,19 +1095,22 @@ async def __call__( request_id = str(uuid.uuid4()) timer = Timer( + app_name=self.name, id=request_id, timestamp=datetime.datetime.now( datetime.timezone.utc, ).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), ) call_timer = Timer( + app_name=self.name, id=request_id, timestamp=datetime.datetime.now( datetime.timezone.utc, ).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), ) - assert scope['type'] == 'http' + if scope['type'] != 'http': + raise ValueError(f"Expected HTTP scope, got {scope['type']}") method = scope['method'] path = tuple(x for x in scope['path'].split('/') if x) @@ -1014,14 +1136,15 @@ async def __call__( # Call the endpoint if method == 'POST' and func is not None and path == self.invoke_path: - logger.info( - json.dumps({ - 'type': 'function_call', - 'id': request_id, - 'name': func_name.decode('utf-8'), + self.logger.info( + 'Function call initiated', + extra={ + 'app_name': self.name, + 'request_id': request_id, + 'function_name': func_name.decode('utf-8'), 'content_type': content_type.decode('utf-8'), 'accepts': accepts.decode('utf-8'), - }), + }, ) args_data_format = func_info['args_data_format'] @@ -1101,8 +1224,14 @@ async def __call__( await send(output_handler['response']) except asyncio.TimeoutError: - logging.exception( - 'Timeout in function call: ' + func_name.decode('utf-8'), + self.logger.exception( + 'Function call timeout', + extra={ + 'app_name': self.name, + 'request_id': request_id, + 'function_name': func_name.decode('utf-8'), + 'timeout': func_info['timeout'], + }, ) body = ( '[TimeoutError] Function call timed out after ' + @@ -1112,15 +1241,26 @@ async def __call__( await send(self.error_response_dict) except asyncio.CancelledError: - logging.exception( - 'Function call cancelled: ' + func_name.decode('utf-8'), + self.logger.exception( + 'Function call cancelled', + extra={ + 'app_name': self.name, + 'request_id': request_id, + 'function_name': func_name.decode('utf-8'), + }, ) body = b'[CancelledError] Function call was cancelled' await send(self.error_response_dict) except Exception as e: - logging.exception( - 'Error in function call: ' + func_name.decode('utf-8'), + self.logger.exception( + 'Function call error', + extra={ + 'app_name': self.name, + 'request_id': request_id, + 'function_name': func_name.decode('utf-8'), + 'exception_type': type(e).__name__, + }, ) body = f'[{type(e).__name__}] {str(e).strip()}'.encode('utf-8') await send(self.error_response_dict) @@ -1173,7 +1313,17 @@ async def __call__( for k, v in call_timer.metrics.items(): timer.metrics[k] = v - timer.finish() + if not self.disable_metrics: + metrics = timer.finish() + self.logger.info( + 'Function call metrics', + extra={ + 'app_name': self.name, + 'request_id': request_id, + 'function_name': timer.metadata.get('function', ''), + 'metrics': metrics, + }, + ) def _create_link( self, @@ -1230,9 +1380,11 @@ def get_function_info( ) -> Dict[str, Any]: """ Return the functions and function signature information. + Returns ------- Dict[str, Any] + """ functions = {} no_default = object() @@ -1284,8 +1436,13 @@ def get_function_info( doc_examples.append(ex_dict) except Exception as e: - logger.warning( - f'Could not parse docstring for function {key}: {e}', + self.logger.warning( + 'Could not parse docstring for function', + extra={ + 'app_name': self.name, + 'function_name': key.decode('utf-8'), + 'error': str(e), + }, ) if not func_name or key == func_name: @@ -1740,6 +1897,22 @@ def main(argv: Optional[List[str]] = None) -> None: ), help='logging level', ) + parser.add_argument( + '--log-file', metavar='filepath', + default=defaults.get( + 'log_file', + get_option('external_function.log_file'), + ), + help='File path to write logs to instead of console', + ) + parser.add_argument( + '--disable-metrics', action='store_true', + default=defaults.get( + 'disable_metrics', + get_option('external_function.disable_metrics'), + ), + help='Disable logging of function call metrics', + ) parser.add_argument( '--name-prefix', metavar='name_prefix', default=defaults.get( @@ -1764,6 +1937,14 @@ def main(argv: Optional[List[str]] = None) -> None: ), help='Database to use for the function definition', ) + parser.add_argument( + '--app-name', metavar='app_name', + default=defaults.get( + 'app_name', + get_option('external_function.app_name'), + ), + help='Name for the application instance', + ) parser.add_argument( 'functions', metavar='module.or.func.path', nargs='*', help='functions or modules to export in UDF server', @@ -1771,8 +1952,6 @@ def main(argv: Optional[List[str]] = None) -> None: args = parser.parse_args(argv) - logger.setLevel(getattr(logging, args.log_level.upper())) - if i > 0: break @@ -1864,6 +2043,10 @@ def main(argv: Optional[List[str]] = None) -> None: name_prefix=args.name_prefix, name_suffix=args.name_suffix, function_database=args.function_database or None, + log_file=args.log_file, + log_level=args.log_level, + disable_metrics=args.disable_metrics, + app_name=args.app_name, ) funcs = app.get_create_functions(replace=args.replace_existing) @@ -1871,11 +2054,11 @@ def main(argv: Optional[List[str]] = None) -> None: raise RuntimeError('no functions specified') for f in funcs: - logger.info(f) + app.logger.info(f) try: if args.db: - logger.info('registering functions with database') + app.logger.info('Registering functions with database') app.register_functions( args.db, replace=args.replace_existing, @@ -1890,6 +2073,9 @@ def main(argv: Optional[List[str]] = None) -> None: ).items() if v is not None } + # Configure uvicorn logging to use JSON format matching Application's format + app_args['log_config'] = app.get_uvicorn_log_config() + if use_async: asyncio.create_task(_run_uvicorn(uvicorn, app, app_args, db=args.db)) else: @@ -1897,7 +2083,7 @@ def main(argv: Optional[List[str]] = None) -> None: finally: if not use_async and args.db: - logger.info('dropping functions from database') + app.logger.info('Dropping functions from database') app.drop_functions(args.db) @@ -1910,7 +2096,7 @@ async def _run_uvicorn( """Run uvicorn server and clean up functions after shutdown.""" await uvicorn.Server(uvicorn.Config(app, **app_args)).serve() if db: - logger.info('dropping functions from database') + app.logger.info('Dropping functions from database') app.drop_functions(db) diff --git a/singlestoredb/functions/ext/timer.py b/singlestoredb/functions/ext/timer.py index cb1f234e3..81832e715 100644 --- a/singlestoredb/functions/ext/timer.py +++ b/singlestoredb/functions/ext/timer.py @@ -4,10 +4,6 @@ from typing import Dict from typing import Optional -from . import utils - -logger = utils.get_logger('singlestoredb.functions.ext.metrics') - class RoundedFloatEncoder(json.JSONEncoder): @@ -87,12 +83,7 @@ def reset(self) -> None: self.entries.clear() self._current_key = None - def finish(self) -> None: + def finish(self) -> Dict[str, Any]: """Finish the current timing context and store the elapsed time.""" self.metrics['total'] = time.perf_counter() - self.start_time - self.log_metrics() - - def log_metrics(self) -> None: - if self.metadata.get('function'): - result = dict(type='function_metrics', **self.metadata, **self.metrics) - logger.info(json.dumps(result, cls=RoundedFloatEncoder)) + return dict(type='function_metrics', **self.metadata, **self.metrics) diff --git a/singlestoredb/functions/ext/utils.py b/singlestoredb/functions/ext/utils.py index ea0e28506..4eda318b3 100644 --- a/singlestoredb/functions/ext/utils.py +++ b/singlestoredb/functions/ext/utils.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import datetime import json import logging import re @@ -30,14 +31,62 @@ def formatMessage(self, record: logging.LogRecord) -> str: return super().formatMessage(recordcopy) +class JSONFormatter(logging.Formatter): + """Custom JSON formatter for structured logging.""" + + def format(self, record: logging.LogRecord) -> str: + # Create proper ISO timestamp with microseconds + timestamp = datetime.datetime.fromtimestamp( + record.created, tz=datetime.timezone.utc, + ) + # Keep only 3 digits for milliseconds + iso_timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + + log_entry = { + 'timestamp': iso_timestamp, + 'level': record.levelname, + 'logger': record.name, + 'message': record.getMessage(), + } + + # Add extra fields if present + allowed_fields = [ + 'app_name', 'request_id', 'function_name', + 'content_type', 'accepts', 'metrics', + ] + for field in allowed_fields: + if hasattr(record, field): + log_entry[field] = getattr(record, field) + + # Add exception info if present + if record.exc_info: + log_entry['exception'] = self.formatException(record.exc_info) + + return json.dumps(log_entry) + + def get_logger(name: str) -> logging.Logger: - """Return a new logger.""" + """Return a logger with JSON formatting.""" logger = logging.getLogger(name) - handler = logging.StreamHandler() - formatter = DefaultFormatter('%(levelprefix)s %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging.INFO) + + # Only configure if not already configured with JSON formatter + has_json_formatter = any( + isinstance(getattr(handler, 'formatter', None), JSONFormatter) + for handler in logger.handlers + ) + + if not logger.handlers or not has_json_formatter: + # Clear handlers only if we need to reconfigure + logger.handlers.clear() + handler = logging.StreamHandler() + formatter = JSONFormatter() + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + # Prevent propagation to avoid duplicate messages or different formatting + logger.propagate = False + return logger