Conversation
CiCi503
left a comment
There was a problem hiding this comment.
- 去掉无关的 md 文件和测试脚本,提交前要看一遍提交的内容
- 代码结构调整:在 service 下新增 gateway 模块,将在 route 里和 serverlessapi 下的关于 cpu 函数的复杂逻辑移至 gateway 下
src/code/agent/routes/routes.py
Outdated
| service.start(constants.AUTO_LAUNCH_SNAPSHOT_NAME) | ||
|
|
||
| print(f"Initializing function with ComfyUI mode: {constants.COMFYUI_MODE}") | ||
| service.start('latest-dev', nodes_map={}) # 使用latest-dev快照,与_tryStartIGService保持一致 |
There was a problem hiding this comment.
这不行,需要区分 dev 环境和 prod 环境。
prod 环境的 initialize 需要基于环境变量里配置的 prod-xxx 的 snapshot 启动
src/code/agent/routes/routes.py
Outdated
| print(f"[Enhanced History] Error getting all persisted history: {e}") | ||
| return {} | ||
|
|
||
| def _get_persisted_history_by_prompt_id(self, api_service, prompt_id): |
There was a problem hiding this comment.
不要将这些复杂的业务逻辑放到 routes 里。
在 service 下新建一个 gateway 目录,将 get_history、status_poller 相关逻辑移动至 gateway 中
| else: | ||
| return [] | ||
|
|
||
| def get_status_from_store_incremental(self, task_id: str): |
There was a problem hiding this comment.
这个逻辑和 serverless api 无关,移走
969857b to
12b563e
Compare
src/code/agent/main.py
Outdated
|
|
||
| _original_print = builtins.print | ||
|
|
||
| def timestamped_print(*args, **kwargs): |
There was a problem hiding this comment.
可以 rebase cap,用 agent/utils 中的 logger
src/code/agent/routes/routes.py
Outdated
|
|
||
| # 使用环境变量指定的snapshot,默认为latest-dev | ||
| snapshot_name = os.environ.get('AUTO_LAUNCH_SNAPSHOT_NAME', 'latest-dev') | ||
| print(f"Initializing function with ComfyUI mode: {constants.COMFYUI_MODE}, snapshot: {snapshot_name}") |
There was a problem hiding this comment.
- 这里的 print/logger 使用 util 中的 logger
- 系统相关的日志默认设置为 DEBUG,默认用户在日志中不需要看到
| service.start(constants.AUTO_LAUNCH_SNAPSHOT_NAME) | ||
|
|
||
| # 使用环境变量指定的snapshot,默认为latest-dev | ||
| snapshot_name = os.environ.get('AUTO_LAUNCH_SNAPSHOT_NAME', 'latest-dev') |
There was a problem hiding this comment.
latest-dev 的 snapshot 是在哪里生成的?
src/code/agent/routes/routes.py
Outdated
|
|
||
| # 发送初始状态消息(模拟ComfyUI原生行为) | ||
| client_id = f"cpu_client_{int(time.time() * 1000)}" | ||
| ws.send(json.dumps({ |
There was a problem hiding this comment.
add_connection 中有 _send_initial_status ,这里也 set status 了,为啥需要有两份呢?
| print(f"ws connected: {json.dumps(conn_info, indent=2)}") | ||
|
|
||
| # 设置TCP_NODELAY禁用Nagle算法,确保消息立即发送 | ||
| self._set_tcp_nodelay(ws) |
There was a problem hiding this comment.
禁用 Nagle 算法是有必要的吗?解决了什么问题?需要引入这个复杂度吗?
| # 任务状态推送功能 | ||
| self._task_subscriptions: Dict[str, Set] = {} # task_id -> set of websockets | ||
| self._client_subscriptions: Dict = {} # websocket -> set of task_ids | ||
| self._client_id_mapping: Dict[str, Set] = {} # client_id -> set of websockets |
There was a problem hiding this comment.
client_id_mapping 为啥需要一个 set 呢?一个 client_id 应该对应一个 ws?
| self._task_subscriptions: Dict[str, Set] = {} # task_id -> set of websockets | ||
| self._client_subscriptions: Dict = {} # websocket -> set of task_ids | ||
| self._client_id_mapping: Dict[str, Set] = {} # client_id -> set of websockets | ||
| self._ws_client_id_mapping: Dict = {} # websocket -> client_id |
There was a problem hiding this comment.
ws_client_id_mapping 没有存在的必要吧, getattr(ws, '_comfyui_client_id') 可以获取到 client_id 呀
src/code/agent/routes/routes.py
Outdated
| "message": "Please start your comfyui/sd service first" | ||
| }), 500 | ||
| # CPU模式:接收ComfyUI原生的WebSocket连接,但推送基于任务队列的真实状态 | ||
| if constants.COMFYUI_MODE == "cpu": |
| return _task_queue_manager | ||
|
|
||
| # 为了向后兼容,提供一个属性访问器 | ||
| class TaskQueueManagerProxy: |
There was a problem hiding this comment.
这个 TaskQueueManagerProxy 有啥用?
047dbcf to
5326170
Compare
src/code/agent/routes/cpu_routes.py
Outdated
| } | ||
| }), 500 | ||
|
|
||
| def _register_userdata_block(self): |
There was a problem hiding this comment.
- 优化函数名,_register_userdata_block => _register_userdata_handler?
- 设置一个环境变量吧,默认不允许 save userdata,可以通过环境变量开关控制是否 save userdata
| def _async_handle_task_failure(self, task_id: str, status_data: dict): | ||
| """异步处理任务失败(向后兼容方法) | ||
|
|
||
| 注意:此方法已被 _async_handle_task_failure_after_update 替代 |
aa5234b to
c1206fa
Compare
Change-Id: I45dc92f37937f2159a8141918718ddb1e5c97bc0 Co-developed-by: Cursor <noreply@cursor.com>
86e9a9a to
82c465d
Compare
82c465d to
50a9900
Compare
| if resp.status_code == 202: | ||
| return task_id, resp | ||
| else: | ||
| return None, (500, "async_invocation_error", f"Failed to invoke GPU function asynchronously: HTTP {resp.status_code}") |
There was a problem hiding this comment.
- 记录错误日志
- 更新 error code && error message
| except Exception as e: | ||
| error_msg = f"Failed to add task to queue: {e}" | ||
| log("ERROR", f"[TaskId={task_id}][RequestId={request_id}] {error_msg}\n{traceback.format_exc()}") | ||
| return None, (500, "queue_error", error_msg) |

No description provided.