|
10 | 10 | from openapi_python.generator import GenerationRequest, generate_client |
11 | 11 |
|
12 | 12 |
|
| 13 | +def _strip_additional_properties(schema: dict) -> None: |
| 14 | + match schema: |
| 15 | + case dict(): |
| 16 | + if schema.get("additionalProperties") is True: |
| 17 | + schema.pop("additionalProperties") |
| 18 | + for value in schema.values(): |
| 19 | + _strip_additional_properties(value) |
| 20 | + case list(): |
| 21 | + for item in schema: |
| 22 | + _strip_additional_properties(item) |
| 23 | + |
| 24 | + |
13 | 25 | def main() -> None: |
14 | 26 | output_dir = Path(__file__).parent / "generated" |
| 27 | + spec = app.openapi() |
| 28 | + _strip_additional_properties(spec) |
15 | 29 | generate_client( |
16 | 30 | GenerationRequest( |
17 | 31 | output_dir=output_dir, |
18 | | - spec_json=json.dumps(app.openapi()), |
| 32 | + spec_json=json.dumps(spec), |
19 | 33 | overwrite=True, |
20 | 34 | ) |
21 | 35 | ) |
22 | 36 |
|
23 | 37 | source = (output_dir / "my_client" / "types.py").read_text() |
24 | 38 | assert "class TaskExecutionDtoLogItem(TypedDict):" not in source |
| 39 | + assert "class TaskExecutionDtoOutputVariant(TypedDict):" not in source |
| 40 | + assert "class TaskExecutionDtoStacktraceVariant(TypedDict):" not in source |
| 41 | + assert "class TaskDtoMetaVariant(TypedDict):" not in source |
| 42 | + assert "class TaskDtoPayload(TypedDict):" not in source |
25 | 43 | assert "log: list[dict[str, Any]]" in source |
26 | | - assert "metadata: dict[str, Any]" in source |
| 44 | + assert "output: dict[str, Any] | None" in source |
| 45 | + assert "stacktrace: dict[str, Any] | None" in source |
| 46 | + assert "meta: dict[str, Any] | None" in source |
| 47 | + assert "payload: dict[str, Any]" in source |
| 48 | + assert "executions: NotRequired[list[TaskExecutionDto] | None]" in source |
27 | 49 |
|
28 | 50 | generated_types = importlib.import_module("generated.my_client.types") |
29 | 51 | api_b = FastAPI() |
30 | 52 |
|
31 | | - def get_task_execution() -> object: |
32 | | - return {} |
| 53 | + def list_tasks() -> object: |
| 54 | + return [] |
33 | 55 |
|
34 | | - api_b.get( |
35 | | - "/task-executions/{task_id}", |
36 | | - response_model=generated_types.TaskExecutionDto, |
37 | | - )(get_task_execution) |
| 56 | + api_b.get("/tasks", response_model=list[generated_types.TaskDto])(list_tasks) |
38 | 57 |
|
39 | | - schema = api_b.openapi()["components"]["schemas"]["TaskExecutionDto"] |
40 | | - log_items = schema["properties"]["log"]["items"] |
41 | | - metadata = schema["properties"]["metadata"] |
| 58 | + schemas = api_b.openapi()["components"]["schemas"] |
| 59 | + task_execution = schemas["TaskExecutionDto"] |
| 60 | + task = schemas["TaskDto"] |
| 61 | + log_items = task_execution["properties"]["log"]["items"] |
| 62 | + output = task_execution["properties"]["output"]["anyOf"][0] |
| 63 | + stacktrace = task_execution["properties"]["stacktrace"]["anyOf"][0] |
| 64 | + meta = task["properties"]["meta"]["anyOf"][0] |
| 65 | + payload = task["properties"]["payload"] |
42 | 66 | assert log_items["type"] == "object" |
43 | 67 | assert log_items["additionalProperties"] is True |
44 | | - assert metadata["type"] == "object" |
45 | | - assert metadata["additionalProperties"] is True |
| 68 | + assert output["type"] == "object" |
| 69 | + assert output["additionalProperties"] is True |
| 70 | + assert stacktrace["type"] == "object" |
| 71 | + assert stacktrace["additionalProperties"] is True |
| 72 | + assert meta["type"] == "object" |
| 73 | + assert meta["additionalProperties"] is True |
| 74 | + assert payload["type"] == "object" |
| 75 | + assert payload["additionalProperties"] is True |
46 | 76 |
|
47 | 77 |
|
48 | 78 | if __name__ == "__main__": |
|
0 commit comments