execgo
<cite> **本文档中引用的文件** - [task.go](file://internal/models/task.go) - [handler.go](file://internal/api/handler.go) - [scheduler.go](file://internal/scheduler/scheduler.go) - [executor.go](file://internal/executor/executor.go) - [main.go](file://cmd/execgo/main.go) - [README.md](file://README.md) </cite>

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构概览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能考虑
  8. 故障排除指南
  9. 结论

简介

ExecGo 是一个使用纯 Go 标准库构建的极简 AI 执行引擎,专为 AI Agent 提供任务提交、DAG 调度、并发执行和可观测性的 HTTP 服务。本文档专注于任务图验证机制,详细说明 TaskGraph.Validate() 方法的实现原理,包括空任务图检查、任务 ID 唯一性验证、必需字段完整性检查,以及依赖关系验证机制(包括未知依赖引用检测、自依赖检测、循环依赖检测算法)。

项目结构

ExecGo 采用清晰的分层架构设计,主要包含以下核心模块:

graph TB
subgraph "应用入口"
MAIN[cmd/execgo/main.go]
end
subgraph "API 层"
API[internal/api/handler.go]
end
subgraph "业务逻辑层"
MODELS[internal/models/task.go]
SCHEDULER[internal/scheduler/scheduler.go]
EXECUTOR[internal/executor/executor.go]
end
subgraph "基础设施层"
STATE[internal/state/state.go]
OBS[internal/observability/observability.go]
CONFIG[internal/config/config.go]
end
MAIN --> API
API --> MODELS
API --> SCHEDULER
API --> EXECUTOR
SCHEDULER --> STATE
SCHEDULER --> EXECUTOR
API --> OBS
API --> CONFIG

图表来源

  • main.go:1-105
  • handler.go:1-157
  • task.go:1-149
  • scheduler.go:1-231

章节来源

  • main.go:1-105
  • README.md:149-177

核心组件

TaskGraph 数据结构

TaskGraph 是一次提交的任务 DAG,包含任务列表和验证方法:

classDiagram
class Task {
+string ID
+string Type
+json.RawMessage Params
+[]string DependsOn
+int Retry
+int64 Timeout
+TaskStatus Status
+json.RawMessage Result
+string Error
+time.Time CreatedAt
+time.Time UpdatedAt
}
class TaskGraph {
+[]*Task Tasks
+Validate() error
}
class TaskStatus {
<<enumeration>>
pending
running
success
failed
skipped
}
TaskGraph --> Task : "contains"
Task --> TaskStatus : "uses"

图表来源

  • task.go:21-39

验证流程

任务图验证采用多阶段检查机制,确保任务图的完整性和有效性:

flowchart TD
Start([开始验证]) --> CheckEmpty["检查任务图是否为空"]
CheckEmpty --> Empty{"是否为空?"}
Empty --> |是| ReturnEmpty["返回空任务图错误"]
Empty --> |否| CheckFields["检查必需字段"]
CheckFields --> FieldValid{"字段是否有效?"}
FieldValid --> |否| ReturnFieldError["返回字段错误"]
FieldValid --> |是| CheckUnique["检查任务 ID 唯一性"]
CheckUnique --> Unique{"ID 是否唯一?"}
Unique --> |否| ReturnDuplicate["返回重复 ID 错误"]
Unique --> |是| CheckDeps["检查依赖关系"]
CheckDeps --> DepsValid{"依赖是否有效?"}
DepsValid --> |否| ReturnDepError["返回依赖错误"]
DepsValid --> |是| CheckCycle["检查循环依赖"]
CheckCycle --> Cycle{"是否存在循环?"}
Cycle --> |是| ReturnCycle["返回循环依赖错误"]
Cycle --> |否| Success["验证成功"]
ReturnEmpty --> End([结束])
ReturnFieldError --> End
ReturnDuplicate --> End
ReturnDepError --> End
ReturnCycle --> End
Success --> End

图表来源

  • task.go:41-79

章节来源

  • task.go:21-79

架构概览

ExecGo 的验证机制在整个系统架构中的位置如下:

sequenceDiagram
participant Client as "客户端"
participant API as "API 处理器"
participant Validator as "TaskGraph.Validate()"
participant Scheduler as "调度器"
participant Executor as "执行器"
Client->>API : POST /tasks (JSON)
API->>API : 解析请求体
API->>Validator : graph.Validate()
Validator->>Validator : 检查空任务图
Validator->>Validator : 检查必需字段
Validator->>Validator : 检查 ID 唯一性
Validator->>Validator : 检查依赖关系
Validator->>Validator : 检查循环依赖
alt 验证失败
Validator-->>API : 返回错误
API-->>Client : 400 Bad Request
else 验证成功
Validator-->>API : 返回 nil
API->>Scheduler : Submit(graph)
Scheduler->>Scheduler : 构建依赖图
Scheduler-->>Client : 202 Accepted
end

图表来源

  • handler.go:58-99
  • task.go:41-79

详细组件分析

TaskGraph.Validate() 方法实现

TaskGraph.Validate() 方法实现了完整的任务图验证逻辑,采用分阶段验证策略:

1. 空任务图检查

验证过程首先检查任务图是否为空,这是最基本的验证步骤:

flowchart TD
Start([Validate 方法调用]) --> CheckLen["len(g.Tasks) == 0"]
CheckLen --> IsEmpty{"长度为 0 ?"}
IsEmpty --> |是| ReturnEmpty["返回 'task graph is empty' 错误"]
IsEmpty --> |否| InitMap["初始化 ID 映射"]
ReturnEmpty --> End([结束])
InitMap --> CheckRequired["遍历每个任务检查必需字段"]

图表来源

  • task.go:42-59

2. 必需字段完整性检查

对于每个任务,验证以下必需字段:

  • ID: 任务唯一标识符
  • Type: 任务类型,必须是非空字符串
flowchart TD
CheckRequired --> LoopTasks["遍历 g.Tasks"]
LoopTasks --> CheckID["t.ID == ''"]
CheckID --> HasID{"ID 为空?"}
HasID --> |是| ReturnIDReq["返回 'task id is required' 错误"]
HasID --> |否| CheckType["t.Type == ''"]
CheckType --> HasType{"Type 为空?"}
HasType --> |是| ReturnTypeReq["返回 'task %q: type is required' 错误"]
HasType --> |否| NextTask["检查下一个任务"]
ReturnIDReq --> End([结束])
ReturnTypeReq --> End
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| CheckUnique

图表来源

  • task.go:48-58

3. 任务 ID 唯一性验证

使用哈希映射确保每个任务 ID 在任务图中唯一:

flowchart TD
CheckUnique --> InitMap["ids := make(map[string]bool, len(g.Tasks))"]
InitMap --> LoopTasks["for _, t := range g.Tasks"]
LoopTasks --> CheckDup["ids[t.ID]"]
CheckDup --> Duplicate{"ID 已存在?"}
Duplicate --> |是| ReturnDup["返回 'duplicate task id: %q' 错误"]
Duplicate --> |否| AddID["ids[t.ID] = true"]
AddID --> NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| CheckDeps
ReturnDup --> End([结束])

图表来源

  • task.go:47-58

4. 依赖关系验证

依赖关系验证包含两个关键检查:未知依赖引用检测和自依赖检测。

4.1 未知依赖引用检测

检查每个任务的 DependsOn 数组,确保所有依赖任务 ID 都存在于任务图中:

flowchart TD
CheckDeps --> LoopTasks["遍历 g.Tasks"]
LoopTasks --> LoopDeps["遍历 t.DependsOn"]
LoopDeps --> CheckUnknown["!ids[dep]"]
CheckUnknown --> Unknown{"依赖 ID 不存在?"}
Unknown --> |是| ReturnUnknown["返回 'task %q depends on unknown task %q' 错误"]
Unknown --> |否| CheckSelf["dep == t.ID"]
CheckSelf --> SelfDep{"依赖等于自身?"}
SelfDep --> |是| ReturnSelf["返回 'task %q cannot depend on itself' 错误"]
SelfDep --> |否| NextDep["下一个依赖"]
NextDep --> MoreDeps{"还有依赖?"}
MoreDeps --> |是| LoopDeps
MoreDeps --> |否| NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| CheckCycle
ReturnUnknown --> End([结束])
ReturnSelf --> End

图表来源

  • task.go:62-71
4.2 自依赖检测

防止任务直接或间接依赖自身,这会导致循环依赖问题。

5. 循环依赖检测算法

使用 Kahn 算法进行拓扑排序检测,确保任务图没有循环依赖:

flowchart TD
CheckCycle --> InitArrays["初始化 inDegree 和 dependents 映射"]
InitArrays --> BuildGraph["构建入度图和反向依赖图"]
BuildGraph --> CountDeps["for _, t := range tasks"]
CountDeps --> IncDeg["inDegree[t.ID]++"]
IncDeg --> AddDep["dependents[dep] = append(dependents[dep], t.ID)"]
AddDep --> NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| CountDeps
MoreTasks --> |否| InitQueue["初始化队列,放入入度为 0 的节点"]
InitQueue --> BFS["BFS 拓扑排序"]
BFS --> VisitNode["visited++"]
VisitNode --> DecDeg["for _, child := range dependents[node]"]
DecDeg --> Decrement["inDegree[child]--"]
Decrement --> CheckZero{"inDegree[child] == 0?"}
CheckZero --> |是| Enqueue["queue = append(queue, child)"]
CheckZero --> |否| Continue["继续"]
Enqueue --> Continue
Continue --> MoreQueue{"队列是否为空?"}
MoreQueue --> |否| BFS
MoreQueue --> |是| CheckVisited["if visited != len(tasks)"]
CheckVisited --> HasCycle{"访问数量不等于任务数量?"}
HasCycle --> |是| ReturnCycle["返回 'cycle detected in task graph' 错误"]
HasCycle --> |否| NoCycle["无循环依赖"]
ReturnCycle --> End([结束])
NoCycle --> End

图表来源

  • task.go:81-121

章节来源

  • task.go:41-121

API 层集成

API 处理器在接收到任务提交请求后,会调用 TaskGraph.Validate() 方法进行验证:

sequenceDiagram
participant Client as "客户端"
participant Handler as "handleSubmitTasks"
participant Graph as "TaskGraph"
participant Validator as "Validate()"
participant Executor as "执行器验证"
participant Scheduler as "调度器"
Client->>Handler : POST /tasks
Handler->>Handler : 解析 JSON 请求体
Handler->>Graph : 创建 TaskGraph 实例
Handler->>Validator : graph.Validate()
alt 验证失败
Validator-->>Handler : error
Handler->>Handler : 记录警告日志
Handler-->>Client : 400 Bad Request + 错误信息
else 验证成功
Validator-->>Handler : nil
Handler->>Executor : 验证任务类型
alt 执行器不存在
Executor-->>Handler : error
Handler-->>Client : 400 Bad Request + 可用类型列表
else 执行器存在
Executor-->>Handler : 成功
Handler->>Scheduler : Submit(graph)
Handler-->>Client : 202 Accepted + 任务 ID 列表
end
end

图表来源

  • handler.go:58-99

章节来源

  • handler.go:58-99

调度器集成

调度器在接收验证通过的任务图后,会构建内部的数据结构来支持并发执行:

flowchart TD
Submit([Submit 方法调用]) --> Lock["s.mu.Lock()"]
Lock --> LoopTasks["遍历 graph.Tasks"]
LoopTasks --> SetStatus["task.Status = StatusPending"]
SetStatus --> PutState["s.state.Put(task)"]
PutState --> IncMetrics["s.metrics.TasksTotal.Add(1)"]
IncMetrics --> IncType["s.metrics.IncType(task.Type)"]
IncType --> BuildDep["构建依赖计数和反向依赖图"]
BuildDep --> DepCount["s.depCount[task.ID] = len(task.DependsOn)"]
DepCount --> LoopDeps["for _, dep := range task.DependsOn"]
LoopDeps --> AddDep["s.dependents[dep] = append(s.dependents[dep], task.ID)"]
AddDep --> NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| EnqueueReady["将入度为 0 的任务加入就绪队列"]
EnqueueReady --> Unlock["s.mu.Unlock()"]
Unlock --> Done([完成])

图表来源

  • scheduler.go:69-97

章节来源

  • scheduler.go:69-97

依赖关系分析

组件耦合度分析

ExecGo 的验证机制展现了良好的模块化设计:

graph TB
subgraph "验证相关组件"
MODELS[models/task.go<br/>TaskGraph.Validate]
API[api/handler.go<br/>handleSubmitTasks]
SCHEDULER[scheduler/scheduler.go<br/>Submit]
EXECUTOR[executor/executor.go<br/>Get/RegisteredTypes]
end
subgraph "外部依赖"
NET[net/http<br/>HTTP 处理]
LOG[slog<br/>日志记录]
TIME[time<br/>时间处理]
end
API --> MODELS
API --> EXECUTOR
API --> SCHEDULER
MODELS --> TIME
SCHEDULER --> EXECUTOR
SCHEDULER --> LOG
API --> NET
API --> LOG

图表来源

  • task.go:1-149
  • handler.go:1-157
  • scheduler.go:1-231
  • executor.go:1-68

错误处理策略

验证机制采用了统一的错误处理策略:

验证阶段错误类型HTTP 状态码错误消息格式
空任务图检查EmptyGraphError400"task graph is empty"
必需字段检查RequiredFieldError400"task id is required" 或 "task %q: type is required"
ID 唯一性检查DuplicateIDError400"duplicate task id: %q"
依赖引用检查UnknownDepError400"task %q depends on unknown task %q"
自依赖检查SelfDepError400"task %q cannot depend on itself"
循环依赖检查CycleError400"cycle detected in task graph"
执行器类型检查UnknownTypeError400"unknown task type: %q (available: %s)"

章节来源

  • task.go:42-79
  • handler.go:70-85

性能考虑

时间复杂度分析

  • 空任务图检查: O(1) - 单次数组长度检查
  • 必需字段检查: O(n) - n 为任务数量
  • ID 唯一性检查: O(n) - 哈希映射插入和查找
  • 依赖关系检查: O(n + d) - n 为任务数量,d 为依赖关系总数
  • 循环依赖检测: O(n + d) - Kahn 算法的标准复杂度

总时间复杂度为 O(n + d),空间复杂度为 O(n + d)。

优化建议

  1. 早期短路: 验证过程采用早期短路策略,一旦发现错误立即返回
  2. 内存复用: 使用预分配的哈希映射减少内存分配开销
  3. 单次遍历: 循环依赖检测与依赖关系检查结合,避免重复遍历

故障排除指南

常见验证失败场景

1. 空任务图错误

症状: 提交空的任务数组 解决方案: 确保任务数组至少包含一个任务

2. 缺少必需字段

症状: 任务缺少 idtype 字段 解决方案: 为每个任务提供唯一且非空的 id 和有效的 type

3. 重复任务 ID

症状: 多个任务使用相同的 id 解决方案: 确保每个任务的 id 在整个任务图中唯一

4. 未知依赖引用

症状: 任务依赖不存在的任务 ID 解决方案: 确保所有依赖的任务 ID 都存在于任务图中

5. 自依赖问题

症状: 任务直接依赖自身 解决方案: 移除自依赖关系

6. 循环依赖

症状: 任务之间形成循环依赖链 解决方案: 重新设计任务依赖关系,打破循环

调试技巧

  1. 启用详细日志: 查看 API 层的日志输出了解验证失败的具体原因
  2. 逐步验证: 先验证基本字段,再检查依赖关系
  3. 使用健康检查: 通过 /health 端点确认服务正常运行

章节来源

  • handler.go:69-74
  • task.go:42-79

结论

ExecGo 的任务图验证机制通过多层次、渐进式的验证策略,确保了任务图的完整性和有效性。该机制具有以下特点:

  1. 全面性: 覆盖了从基础字段验证到高级依赖关系分析的所有必要检查
  2. 高效性: 采用优化的算法和数据结构,确保验证过程快速执行
  3. 可维护性: 清晰的模块化设计和统一的错误处理策略
  4. 用户友好: 提供详细的错误信息,帮助开发者快速定位和解决问题

通过 TaskGraph.Validate() 方法,ExecGo 为 AI Agent 提供了一个可靠的任务执行基础,确保复杂的工作流能够在保证正确性的前提下高效执行。