|
1 | 1 | from json import dumps |
2 | 2 | from pathlib import Path |
| 3 | +from typing import Any |
| 4 | +from typing import cast |
3 | 5 |
|
4 | 6 | import pytest |
5 | 7 | import yaml |
6 | 8 | from falcon import status_codes |
7 | 9 | from falcon.asgi import App |
| 10 | +from falcon.asgi import Response |
8 | 11 | from falcon.constants import MEDIA_JSON |
9 | 12 | from falcon.testing import TestClient |
10 | 13 | from jsonschema_path import SchemaPath |
11 | 14 |
|
12 | 15 | from openapi_core.contrib.falcon.middlewares import FalconASGIOpenAPIMiddleware |
13 | 16 | from openapi_core.contrib.falcon.middlewares import FalconOpenAPIMiddleware |
| 17 | +from openapi_core.contrib.falcon.requests import FalconAsgiOpenAPIRequest |
| 18 | +from openapi_core.contrib.falcon.responses import FalconAsgiOpenAPIResponse |
| 19 | +from openapi_core.contrib.falcon.util import serialize_body |
14 | 20 |
|
15 | 21 |
|
16 | 22 | @pytest.fixture |
@@ -52,6 +58,23 @@ async def on_get(self, req, resp): |
52 | 58 | resp.set_header("X-Rate-Limit", "12") |
53 | 59 |
|
54 | 60 |
|
| 61 | +class _AsyncStream: |
| 62 | + def __init__(self, chunks): |
| 63 | + self._chunks = chunks |
| 64 | + self._index = 0 |
| 65 | + |
| 66 | + def __aiter__(self): |
| 67 | + return self |
| 68 | + |
| 69 | + async def __anext__(self): |
| 70 | + if self._index >= len(self._chunks): |
| 71 | + raise StopAsyncIteration |
| 72 | + |
| 73 | + chunk = self._chunks[self._index] |
| 74 | + self._index += 1 |
| 75 | + return chunk |
| 76 | + |
| 77 | + |
55 | 78 | def test_dual_mode_sync_middleware_works_with_asgi_app(spec): |
56 | 79 | middleware = FalconOpenAPIMiddleware.from_spec(spec) |
57 | 80 | app = App(middleware=[middleware]) |
@@ -121,3 +144,68 @@ def test_explicit_asgi_middleware_validates_response(spec): |
121 | 144 |
|
122 | 145 | assert response.status_code == 400 |
123 | 146 | assert "errors" in response.json |
| 147 | + |
| 148 | + |
| 149 | +@pytest.mark.asyncio |
| 150 | +async def test_asgi_response_adapter_handles_stream_without_charset(): |
| 151 | + chunks = [ |
| 152 | + b'{"data": [', |
| 153 | + b'{"id": 12, "name": "Cat", "ears": {"healthy": true}}', |
| 154 | + b"]}", |
| 155 | + ] |
| 156 | + response = Response() |
| 157 | + response.content_type = MEDIA_JSON |
| 158 | + response.stream = _AsyncStream(chunks) |
| 159 | + |
| 160 | + openapi_response = await FalconAsgiOpenAPIResponse.from_response(response) |
| 161 | + |
| 162 | + assert openapi_response.data == b"".join(chunks) |
| 163 | + assert response.stream is not None |
| 164 | + |
| 165 | + replayed_chunks = [] |
| 166 | + async for chunk in response.stream: |
| 167 | + replayed_chunks.append(chunk) |
| 168 | + assert b"".join(replayed_chunks) == b"".join(chunks) |
| 169 | + |
| 170 | + |
| 171 | +def test_asgi_request_body_cached_none_skips_media_deserialization(): |
| 172 | + class _DummyRequest: |
| 173 | + def get_media(self, *args, **kwargs): |
| 174 | + raise AssertionError("get_media should not be called") |
| 175 | + |
| 176 | + openapi_request = object.__new__(FalconAsgiOpenAPIRequest) |
| 177 | + openapi_request.request = cast(Any, _DummyRequest()) |
| 178 | + openapi_request._body = None |
| 179 | + |
| 180 | + assert openapi_request.body is None |
| 181 | + |
| 182 | + |
| 183 | +def test_multipart_unsupported_serialization_warns_and_returns_none(): |
| 184 | + content_type = "multipart/form-data; boundary=test" |
| 185 | + |
| 186 | + class _DummyHandler: |
| 187 | + def serialize(self, media, content_type): |
| 188 | + raise NotImplementedError( |
| 189 | + "multipart form serialization unsupported" |
| 190 | + ) |
| 191 | + |
| 192 | + class _DummyMediaHandlers: |
| 193 | + def _resolve(self, content_type, default_media_type): |
| 194 | + return (_DummyHandler(), content_type, None) |
| 195 | + |
| 196 | + class _DummyOptions: |
| 197 | + media_handlers = _DummyMediaHandlers() |
| 198 | + default_media_type = MEDIA_JSON |
| 199 | + |
| 200 | + class _DummyRequest: |
| 201 | + options = _DummyOptions() |
| 202 | + |
| 203 | + with pytest.warns( |
| 204 | + UserWarning, |
| 205 | + match="body serialization for multipart/form-data", |
| 206 | + ): |
| 207 | + body = serialize_body( |
| 208 | + cast(Any, _DummyRequest()), {"name": "Cat"}, content_type |
| 209 | + ) |
| 210 | + |
| 211 | + assert body is None |
0 commit comments