Skip to content

Commit ccd4f2f

Browse files
authored
Merge pull request #476 from chaitin/fix/handle-processing-duplicate-vmready
fix: 修复 Agent 重连导致运行中任务异常变为 error
2 parents f20704c + f522f27 commit ccd4f2f

3 files changed

Lines changed: 28 additions & 12 deletions

File tree

backend/biz/host/handler/v1/internal.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@ import (
2929

3030
// InternalHostHandler 处理 taskflow 回调的 host/VM 相关接口
3131
type InternalHostHandler struct {
32-
logger *slog.Logger
33-
repo domain.HostRepo
34-
teamRepo domain.TeamHostRepo
35-
redis *redis.Client
36-
cache *cache.Cache
37-
hook domain.InternalHook // 可选,由内部项目通过 WithInternalHook 注入
38-
taskLifecycle *lifecycle.Manager[uuid.UUID, consts.TaskStatus, lifecycle.TaskMetadata]
39-
hostUsecase domain.HostUsecase
40-
taskConns *ws.TaskConn
41-
projectUsecase domain.ProjectUsecase
42-
tokenProvider *gituc.TokenProvider
32+
logger *slog.Logger
33+
repo domain.HostRepo
34+
teamRepo domain.TeamHostRepo
35+
redis *redis.Client
36+
cache *cache.Cache
37+
hook domain.InternalHook // 可选,由内部项目通过 WithInternalHook 注入
38+
taskLifecycle *lifecycle.Manager[uuid.UUID, consts.TaskStatus, lifecycle.TaskMetadata]
39+
hostUsecase domain.HostUsecase
40+
taskConns *ws.TaskConn
41+
projectUsecase domain.ProjectUsecase
42+
tokenProvider *gituc.TokenProvider
4343
}
4444

4545
func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
@@ -346,6 +346,9 @@ func (h *InternalHostHandler) VmReady(c *web.Context, req taskflow.VirtualMachin
346346

347347
for _, t := range vm.Edges.Tasks {
348348
h.logger.With("task", t).DebugContext(c.Request().Context(), "vm-ready")
349+
if t.Status == consts.TaskStatusProcessing {
350+
continue
351+
}
349352

350353
if t.Kind == consts.TaskTypeReview && t.SubType == consts.TaskSubTypePrReview {
351354
} else {

backend/biz/task/usecase/task.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ func (a *TaskUsecase) Info(ctx context.Context, user *domain.User, id uuid.UUID)
139139
case types.ConditionTypeFailed:
140140
vm.Status = taskflow.VirtualMachineStatusOffline
141141
case types.ConditionTypeHibernated:
142-
vm.Status = taskflow.VirtualMachineStatusHibernated
142+
if strings.ToLower(strings.TrimSpace(cond.Reason)) == "hibernated" {
143+
vm.Status = taskflow.VirtualMachineStatusHibernated
144+
}
143145
case types.ConditionTypeReady:
144146
if time.Since(time.Unix(vm.CreatedAt, 0)) > 2*time.Minute {
145147
vm.Status = taskflow.VirtualMachineStatusOffline

backend/pkg/lifecycle/taskhook.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (h *TaskHook) OnStateChange(ctx context.Context, id uuid.UUID, from, to con
5858

5959
func (h *TaskHook) withError(ctx context.Context, id, uid uuid.UUID, fn func() error) {
6060
if err := fn(); err != nil {
61+
h.logger.With("error", err, "task_id", id).ErrorContext(ctx, "failed to handle processing")
6162
if err := h.taskLifecycle.Transition(ctx, id, consts.TaskStatusError, TaskMetadata{
6263
TaskID: id,
6364
UserID: uid,
@@ -78,6 +79,16 @@ func (h *TaskHook) handleError(ctx context.Context, id, uid uuid.UUID) error {
7879

7980
func (h *TaskHook) handleProcessing(ctx context.Context, id uuid.UUID, metadata TaskMetadata) error {
8081
h.withError(ctx, id, metadata.UserID, func() error {
82+
// 从 DB 查询当前任务状态,如果已经是 processing 说明是 Agent 重连触发的重复 vm-ready,跳过
83+
t, err := h.repo.GetByID(ctx, id)
84+
if err != nil {
85+
return fmt.Errorf("failed to get task: %w", err)
86+
}
87+
if t.Status == consts.TaskStatusProcessing {
88+
h.logger.With("task_id", id).InfoContext(ctx, "task already processing, skipping (likely agent reconnect)")
89+
return nil
90+
}
91+
8192
reqKey := fmt.Sprintf("task:create_req:%s", id.String())
8293
val, err := h.redis.Get(ctx, reqKey).Result()
8394
if err != nil {

0 commit comments

Comments
 (0)