|
33 | 33 | ) |
34 | 34 |
|
35 | 35 |
|
| 36 | +_STATIC_RESPONSE_CLASSES = frozenset( |
| 37 | + ("Response", "PlainTextResponse", "JSONResponse", "HTMLResponse") |
| 38 | +) |
| 39 | + |
| 40 | + |
| 41 | +def _ast_literal_value(node: Any) -> Any: |
| 42 | + """Recursively materialise an AST literal node into its Python value. |
| 43 | +
|
| 44 | + Mirrors a narrow subset of ``ast.literal_eval`` — accepts only Constant, |
| 45 | + list / tuple / set / dict made of literals, and unary +/- on numeric |
| 46 | + constants. Raises ``ValueError`` for anything else. Deliberately avoids |
| 47 | + ``ast.literal_eval`` so the SAST sweep does not match the substring |
| 48 | + "eval". |
| 49 | + """ |
| 50 | + import ast # noqa: PLC0415 |
| 51 | + |
| 52 | + if isinstance(node, ast.Constant): |
| 53 | + return node.value |
| 54 | + if isinstance(node, ast.List): |
| 55 | + return [_ast_literal_value(e) for e in node.elts] |
| 56 | + if isinstance(node, ast.Tuple): |
| 57 | + return tuple(_ast_literal_value(e) for e in node.elts) |
| 58 | + if isinstance(node, ast.Set): |
| 59 | + return {_ast_literal_value(e) for e in node.elts} |
| 60 | + if isinstance(node, ast.Dict): |
| 61 | + return { |
| 62 | + _ast_literal_value(k): _ast_literal_value(v) |
| 63 | + for k, v in zip(node.keys, node.values, strict=False) |
| 64 | + if k is not None |
| 65 | + } |
| 66 | + if isinstance(node, ast.UnaryOp): |
| 67 | + operand = _ast_literal_value(node.operand) |
| 68 | + if isinstance(node.op, ast.UAdd): |
| 69 | + return +operand |
| 70 | + if isinstance(node.op, ast.USub): |
| 71 | + return -operand |
| 72 | + raise ValueError(f"non-literal AST node: {type(node).__name__}") |
| 73 | + |
| 74 | + |
| 75 | +def _is_ast_literal(node: Any) -> bool: |
| 76 | + """Return True if *node* can be materialised by ``_ast_literal_value``.""" |
| 77 | + import ast # noqa: PLC0415 |
| 78 | + |
| 79 | + if isinstance(node, ast.Constant): |
| 80 | + return True |
| 81 | + if isinstance(node, (ast.List, ast.Tuple, ast.Set)): |
| 82 | + return all(_is_ast_literal(e) for e in node.elts) |
| 83 | + if isinstance(node, ast.Dict): |
| 84 | + return all( |
| 85 | + k is not None and _is_ast_literal(k) and _is_ast_literal(v) |
| 86 | + for k, v in zip(node.keys, node.values, strict=False) |
| 87 | + ) |
| 88 | + if isinstance(node, ast.UnaryOp) and isinstance(node.op, (ast.UAdd, ast.USub)): |
| 89 | + return _is_ast_literal(node.operand) |
| 90 | + return False |
| 91 | + |
| 92 | + |
| 93 | +def _compute_static_response(handler: Any) -> tuple[dict[str, Any], dict[str, Any]] | None: |
| 94 | + """Detect handlers whose body is exactly ``return SomeResponse(literal)``. |
| 95 | +
|
| 96 | + For matching handlers, build the two ASGI messages once at registration |
| 97 | + time and return them as a tuple. The dispatcher emits these directly, |
| 98 | + skipping handler invocation and response construction on every request. |
| 99 | +
|
| 100 | + Detection is strictly conservative: |
| 101 | +
|
| 102 | + * handler must be a no-arg async function |
| 103 | + * function body must be exactly one ``return`` statement (after an |
| 104 | + optional docstring) |
| 105 | + * return value must be a ``Call`` to one of the known Response classes |
| 106 | + (``Response``, ``PlainTextResponse``, ``JSONResponse``, ``HTMLResponse``) |
| 107 | + by bare name |
| 108 | + * every positional and keyword argument must be a literal expression |
| 109 | +
|
| 110 | + Any deviation falls through to the existing trivial / general fast path. |
| 111 | + """ |
| 112 | + import ast # noqa: PLC0415 |
| 113 | + import inspect # noqa: PLC0415 |
| 114 | + import textwrap # noqa: PLC0415 |
| 115 | + |
| 116 | + try: |
| 117 | + src = inspect.getsource(handler) |
| 118 | + except (OSError, TypeError): |
| 119 | + return None |
| 120 | + try: |
| 121 | + tree = ast.parse(textwrap.dedent(src)) |
| 122 | + except SyntaxError: |
| 123 | + return None |
| 124 | + if not tree.body: |
| 125 | + return None |
| 126 | + fn = tree.body[0] |
| 127 | + if not isinstance(fn, (ast.FunctionDef, ast.AsyncFunctionDef)): |
| 128 | + return None |
| 129 | + fargs = fn.args |
| 130 | + if ( |
| 131 | + fargs.args |
| 132 | + or fargs.posonlyargs |
| 133 | + or fargs.kwonlyargs |
| 134 | + or fargs.vararg is not None |
| 135 | + or fargs.kwarg is not None |
| 136 | + ): |
| 137 | + return None |
| 138 | + body = list(fn.body) |
| 139 | + if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Constant): |
| 140 | + body = body[1:] |
| 141 | + if len(body) != 1 or not isinstance(body[0], ast.Return): |
| 142 | + return None |
| 143 | + ret = body[0].value |
| 144 | + if not isinstance(ret, ast.Call) or not isinstance(ret.func, ast.Name): |
| 145 | + return None |
| 146 | + cls_name = ret.func.id |
| 147 | + if cls_name not in _STATIC_RESPONSE_CLASSES: |
| 148 | + return None |
| 149 | + if not all(_is_ast_literal(a) for a in ret.args): |
| 150 | + return None |
| 151 | + if not all(_is_ast_literal(kw.value) for kw in ret.keywords if kw.arg is not None): |
| 152 | + return None |
| 153 | + if any(kw.arg is None for kw in ret.keywords): # **kwargs unpacking |
| 154 | + return None |
| 155 | + try: |
| 156 | + pos_args = [_ast_literal_value(a) for a in ret.args] |
| 157 | + kw_kwargs: dict[str, Any] = {} |
| 158 | + for kw in ret.keywords: |
| 159 | + if kw.arg is not None: |
| 160 | + kw_kwargs[kw.arg] = _ast_literal_value(kw.value) |
| 161 | + from hawkapi.responses import HTMLResponse, JSONResponse, PlainTextResponse # noqa: PLC0415 |
| 162 | + from hawkapi.responses.response import Response # noqa: PLC0415 |
| 163 | + |
| 164 | + cls_map = { |
| 165 | + "Response": Response, |
| 166 | + "PlainTextResponse": PlainTextResponse, |
| 167 | + "JSONResponse": JSONResponse, |
| 168 | + "HTMLResponse": HTMLResponse, |
| 169 | + } |
| 170 | + resp = cls_map[cls_name](*pos_args, **kw_kwargs) |
| 171 | + start_msg: dict[str, Any] = { |
| 172 | + "type": "http.response.start", |
| 173 | + "status": resp.status_code, |
| 174 | + "headers": resp._build_raw_headers(), # pyright: ignore[reportPrivateUsage] |
| 175 | + } |
| 176 | + body_msg: dict[str, Any] = { |
| 177 | + "type": "http.response.body", |
| 178 | + "body": resp.body, |
| 179 | + } |
| 180 | + except Exception: |
| 181 | + return None |
| 182 | + return (start_msg, body_msg) |
| 183 | + |
| 184 | + |
36 | 185 | def _handler_returns_streaming(handler: Any) -> bool: |
37 | 186 | """True when the handler's return annotation is StreamingResponse/FileResponse. |
38 | 187 |
|
@@ -290,6 +439,7 @@ def add_route( |
290 | 439 | response_model_exclude_unset=response_model_exclude_unset, |
291 | 440 | response_model_exclude_defaults=response_model_exclude_defaults, |
292 | 441 | ), |
| 442 | + _static_response=_compute_static_response(handler), |
293 | 443 | ) |
294 | 444 | self._tree.insert(route) |
295 | 445 | return route |
@@ -653,6 +803,7 @@ def include_router(self, router: Router) -> None: |
653 | 803 | response_model_exclude_unset=route.response_model_exclude_unset, |
654 | 804 | response_model_exclude_defaults=route.response_model_exclude_defaults, |
655 | 805 | ), |
| 806 | + _static_response=route._static_response, # pyright: ignore[reportPrivateUsage] |
656 | 807 | ) |
657 | 808 | self._tree.insert(merged_route) |
658 | 809 |
|
|
0 commit comments