<cite>
**本文档引用的文件**
- [main.go](file://cmd/execgo/main.go)
- [executor.go](file://internal/executor/executor.go)
- [http.go](file://internal/executor/http.go)
- [shell.go](file://internal/executor/shell.go)
- [file.go](file://internal/executor/file.go)
- [task.go](file://internal/models/task.go)
- [scheduler.go](file://internal/scheduler/scheduler.go)
- [state.go](file://internal/state/state.go)
- [handler.go](file://internal/api/handler.go)
- [observability.go](file://internal/observability/observability.go)
- [README.md](file://README.md)
</cite>
目录
简介
本指南面向需要在 ExecGo 引擎中扩展自定义执行器的开发者。ExecGo 采用“插件式执行器”设计,通过统一的 Executor 接口与全局注册表实现可扩展的任务执行能力。本文将系统讲解:
- 执行器接口的实现要求与命名规范
- 从接口实现到注册使用的完整开发流程
- 生命周期管理、资源清理与错误处理最佳实践
- 调试技巧、测试策略与性能优化建议
- 多个自定义执行器的实际开发示例(按场景与复杂度递增)
项目结构
ExecGo 的核心围绕“API 层 → 调度器 → 执行器 → 状态管理”的分层架构展开。执行器模块位于 internal/executor,提供统一接口与内置实现,并通过注册表对外暴露。
graph TB
subgraph "入口与配置"
MAIN["cmd/execgo/main.go<br/>应用入口"]
end
subgraph "API 层"
API["internal/api/handler.go<br/>HTTP 路由与处理器"]
end
subgraph "调度器"
SCHED["internal/scheduler/scheduler.go<br/>DAG 调度与并发控制"]
end
subgraph "执行器"
EXIF["internal/executor/executor.go<br/>接口与注册表"]
EXHTTP["internal/executor/http.go<br/>HTTP 执行器"]
EXSHELL["internal/executor/shell.go<br/>Shell 执行器"]
EXFILE["internal/executor/file.go<br/>文件执行器"]
end
subgraph "状态与可观测"
STATE["internal/state/state.go<br/>内存状态与持久化"]
OBS["internal/observability/observability.go<br/>日志/追踪/指标"]
end
MAIN --> API
API --> SCHED
SCHED --> EXIF
EXIF --> EXHTTP
EXIF --> EXSHELL
EXIF --> EXFILE
SCHED --> STATE
API --> OBS
MAIN --> OBS
图表来源
- main.go:25-104
- handler.go:39-52
- scheduler.go:34-45
- executor.go:14-67
- http.go:22-75
- shell.go:31-79
- file.go:20-113
- state.go:17-53
- observability.go:46-80
章节来源
- README.md:149-177
- main.go:25-104
- handler.go:39-52
- scheduler.go:34-45
- executor.go:14-67
核心组件
- 执行器接口与注册表
- 接口定义包含 Type() 与 Execute() 两个方法,Type() 用于唯一标识执行器类型,Execute() 执行任务并返回 JSON 结果。
- 注册表提供 Register、Get、RegisteredTypes 与 RegisterBuiltins 等能力,支持内置执行器注册与查询。
- 内置执行器
- HTTPExecutor:封装 HTTP 请求,支持方法、头、体等参数,限制响应大小。
- ShellExecutor:白名单命令执行,确保安全;支持工作目录与上下文取消。
- FileExecutor:文件读写、追加、删除、统计,路径清洗防越狱。
- 调度器
- 基于 DAG 的并发调度,支持重试(指数退避)、超时(context 控制)、依赖级联与状态推进。
- 状态管理
- 内存状态 + JSON 文件持久化,支持周期性与最终持久化,崩溃后将 running 状态重置为 pending。
- API 层
- 提交任务图、查询任务、删除任务、健康检查、指标端点;提交前校验任务类型是否存在对应执行器。
章节来源
- executor.go:14-67
- http.go:14-75
- shell.go:24-79
- file.go:13-113
- scheduler.go:69-230
- state.go:25-179
- handler.go:58-99
架构总览
ExecGo 的执行链路如下:客户端通过 API 提交任务图,API 校验任务类型并提交给调度器;调度器根据依赖关系并发执行,按类型从注册表获取执行器;执行器完成任务后返回 JSON 结果,调度器更新状态并级联触发下游任务。
sequenceDiagram
participant Client as "客户端"
participant API as "API 层"
participant Sched as "调度器"
participant Reg as "执行器注册表"
participant Exec as "具体执行器"
participant State as "状态管理"
Client->>API : "POST /tasks 提交任务图"
API->>API : "校验任务图与类型"
API->>Sched : "Submit(graph)"
Sched->>Reg : "Get(task.Type)"
Reg-->>Sched : "Executor 实例"
loop 并发执行
Sched->>Exec : "Execute(ctx, task)"
Exec-->>Sched : "json.RawMessage, error"
Sched->>State : "UpdateStatus(...)"
Sched->>Sched : "级联触发下游"
end
API-->>Client : "202 Accepted + 任务ID列表"
图表来源
- handler.go:58-99
- scheduler.go:127-190
- executor.go:38-48
详细组件分析
执行器接口与注册表
- 接口职责
- Type():返回执行器类型字符串,用于在注册表中唯一标识该执行器。
- Execute(ctx, task):执行任务,返回 JSON 结果与错误。错误会触发调度器的重试与失败状态记录。
- 注册表机制
- Register(e):将执行器实例注册到全局映射。
- Get(taskType):按类型获取执行器,未找到则返回错误。
- RegisteredTypes():返回所有已注册类型,用于 API 校验与提示。
- RegisterBuiltins():注册内置执行器(HTTP、Shell、File)。
classDiagram
class Executor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class HTTPExecutor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class ShellExecutor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class FileExecutor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class Registry {
+Register(e)
+Get(taskType) (Executor, error)
+RegisteredTypes() []string
+RegisterBuiltins()
}
Executor <|.. HTTPExecutor
Executor <|.. ShellExecutor
Executor <|.. FileExecutor
Registry --> Executor : "管理与查询"
图表来源
- executor.go:14-67
- http.go:22-75
- shell.go:31-79
- file.go:20-113
章节来源
- executor.go:14-67
HTTP 执行器
- 参数结构:URL、Method、Headers、Body。
- 行为要点:
- Method 缺省为 GET。
- 限制响应体大小(1MB),避免内存膨胀。
- 即使 HTTP 错误码 ≥ 400 也返回结果,便于上层判断与记录。
- 错误处理:参数解析失败、请求创建失败、网络错误、响应读取失败均返回带上下文的错误。
flowchart TD
Start(["进入 Execute"]) --> Parse["解析参数"]
Parse --> Valid{"参数有效?"}
Valid --> |否| ErrParse["返回解析错误"]
Valid --> |是| BuildReq["构造请求(含上下文)"]
BuildReq --> DoReq["发送请求"]
DoReq --> ReadResp["读取响应(限制大小)"]
ReadResp --> StatusOK{"状态码 < 400 ?"}
StatusOK --> |是| MarshalOK["序列化成功结果"]
StatusOK --> |否| MarshalWarn["序列化警告结果"]
MarshalOK --> Done(["返回"])
MarshalWarn --> Done
ErrParse --> Done
图表来源
- http.go:27-75
章节来源
- http.go:14-75
Shell 执行器(白名单)
- 安全策略:仅允许预定义白名单命令,防止任意命令执行。
- 参数结构:Command、Args、Dir。
- 行为要点:
- 提取基础命令名进行白名单校验。
- 支持工作目录设置与上下文取消。
- 输出 stdout/stderr 与退出码,错误时返回带上下文的错误。
- 错误处理:参数解析失败、命令不在白名单、执行失败均返回带上下文的错误。
flowchart TD
Start(["进入 Execute"]) --> Parse["解析参数"]
Parse --> Valid{"参数有效且有命令?"}
Valid --> |否| ErrParse["返回解析错误"]
Valid --> |是| Base["提取基础命令名"]
Base --> Check{"在白名单?"}
Check --> |否| ErrWL["返回白名单错误"]
Check --> |是| Cmd["构建命令(含上下文/工作目录)"]
Cmd --> Run["执行命令"]
Run --> Result["组装结果(输出+退出码)"]
Result --> Done(["返回"])
ErrParse --> Done
ErrWL --> Done
图表来源
- shell.go:36-79
章节来源
- shell.go:14-79
文件执行器
- 参数结构:Action(read/write/append/delete/stat)、Path、Content。
- 行为要点:
- 路径清洗(filepath.Clean)防止目录穿越。
- write/append:自动创建目录,追加或截断写入。
- stat:返回文件元信息。
- 错误处理:参数解析失败、路径缺失、文件操作失败均返回带上下文的错误。
flowchart TD
Start(["进入 Execute"]) --> Parse["解析参数"]
Parse --> PathEmpty{"Path 是否为空?"}
PathEmpty --> |是| ErrPath["返回路径错误"]
PathEmpty --> |否| Clean["路径清洗"]
Clean --> Action{"Action 类型?"}
Action --> |read| Read["读取文件"]
Action --> |write| Write["写入文件(创建目录)"]
Action --> |append| Append["追加文件(创建目录)"]
Action --> |delete| Delete["删除文件"]
Action --> |stat| Stat["获取文件信息"]
Read --> Done(["返回"])
Write --> Done
Append --> Done
Delete --> Done
Stat --> Done
ErrPath --> Done
图表来源
- file.go:25-113
章节来源
- file.go:13-113
调度器与生命周期
- 生命周期
- Start:启动工作循环,维护就绪队列、并发信号量与依赖计数。
- Stop:优雅关闭,等待所有工作协程结束。
- Submit:提交任务图,构建依赖图并入队无依赖任务。
- 执行流程
- 从就绪队列取出任务,获取并发槽,调用执行器 Execute。
- 支持重试(指数退避,上限 10 秒),支持超时(毫秒)。
- 成功/失败后更新状态并级联触发下游任务。
- 资源清理
- 通过 context.WithCancel/WithTimeout 控制执行器生命周期。
- 任务完成后释放并发槽,避免资源泄漏。
sequenceDiagram
participant S as "调度器"
participant Q as "就绪队列"
participant E as "执行器"
participant M as "状态管理"
S->>Q : "enqueue(task)"
loop 主循环
Q-->>S : "task"
S->>S : "获取并发槽"
S->>E : "Execute(ctx, task)"
alt 成功
E-->>S : "result"
S->>M : "UpdateStatus(success)"
S->>S : "级联触发下游"
else 失败
E-->>S : "error"
S->>M : "UpdateStatus(failed)"
end
S->>S : "释放并发槽"
end
图表来源
- scheduler.go:109-190
章节来源
- scheduler.go:47-67
- scheduler.go:69-97
- scheduler.go:127-190
- scheduler.go:192-230
状态管理与持久化
- 内存状态:map[string]*Task + RWMutex,提供 Put/Get/UpdateStatus/Delete/GetAll。
- 持久化策略:
- 启动时从 data/state.json 加载,若存在 running 任务则重置为 pending。
- 周期性持久化(默认 30 秒),最终持久化在优雅关闭时触发。
- 使用临时文件 + 原子重命名保证一致性。
flowchart TD
Start(["启动"]) --> Load["加载持久化数据"]
Load --> Reset{"发现 running 任务?"}
Reset --> |是| ToPending["重置为 pending"]
Reset --> |否| Ready["准备就绪"]
Ready --> Loop["周期性持久化循环"]
Loop --> Tick["定时器触发"]
Tick --> Persist["写入临时文件并重命名"]
Persist --> Loop
Loop --> Stop["收到停止信号"]
Stop --> FinalPersist["最终持久化"]
FinalPersist --> End(["结束"])
图表来源
- state.go:25-53
- state.go:110-134
- state.go:160-179
章节来源
- state.go:25-179
API 层与可观测性
- API 层
- 提交任务图:校验 JSON 与 TaskGraph,检查任务类型是否存在对应执行器,提交后返回 202 与任务 ID 列表。
- 查询/删除/健康检查/指标端点。
- 可观测性
- 结构化 JSON 日志(slog)。
- TraceMiddleware 注入/透传 trace_id,便于跨组件关联。
- Metrics 提供任务总数、运行中、成功、失败与按类型计数。
sequenceDiagram
participant C as "客户端"
participant H as "API 处理器"
participant V as "校验器"
participant R as "注册表"
participant S as "调度器"
C->>H : "POST /tasks"
H->>V : "解码并校验任务图"
V-->>H : "校验通过/失败"
H->>R : "Get(task.Type)"
R-->>H : "执行器存在/不存在"
alt 存在
H->>S : "Submit(graph)"
H-->>C : "202 Accepted + 任务ID列表"
else 不存在
H-->>C : "400 Bad Request + 可用类型提示"
end
图表来源
- handler.go:58-99
- executor.go:38-48
章节来源
- handler.go:58-146
- observability.go:46-134
依赖分析
- 组件耦合
- API 层依赖调度器与注册表;调度器依赖注册表与状态管理;执行器彼此独立,仅通过接口与注册表交互。
- 外部依赖
- 项目采用纯标准库,无第三方依赖,降低维护成本与部署复杂度。
- 循环依赖
- 未发现循环依赖;分层清晰,接口边界明确。
graph LR
API["API 层"] --> SCHED["调度器"]
SCHED --> REG["执行器注册表"]
REG --> EX["执行器实现"]
SCHED --> STATE["状态管理"]
API --> OBS["可观测性"]
MAIN["入口"] --> API
MAIN --> OBS
图表来源
- handler.go:39-52
- scheduler.go:34-45
- executor.go:26-29
- state.go:17-23
- main.go:25-104
章节来源
- go.mod:1-4
- README.md:253-261
性能考虑
- 并发与背压
- 调度器通过信号量控制最大并发,避免资源争用;就绪队列容量为 1024,满载时采用异步回填策略。
- 超时与重试
- 任务支持毫秒级超时;执行失败按指数退避重试,上限 10 秒,避免雪崩效应。
- I/O 限制
- HTTP 执行器限制响应体大小;文件写入使用 O_TRUNC/O_APPEND 控制;Shell 执行器输出缓冲区受控。
- 持久化
- 周期性持久化减少磁盘压力;最终持久化确保优雅关闭时数据落盘。
- 指标与日志
- 提供任务总量、运行中、成功、失败与按类型计数;结构化日志便于定位问题。
章节来源
- scheduler.go:40-44
- scheduler.go:100-107
- scheduler.go:152-179
- http.go:60-63
- state.go:160-179
故障排查指南
- 常见错误与定位
- 任务类型未知:API 层会返回可用类型列表,检查 Type() 返回值与注册是否一致。
- 参数解析失败:检查 params JSON 结构与字段名称,参考内置执行器参数格式。
- 超时/重试:查看调度器日志中的重试次数与退避间隔;必要时调整任务 timeout 与 retry。
- 文件路径问题:确认路径清洗与权限;避免相对路径与越界访问。
- Shell 命令被拒绝:确认命令在白名单内;注意路径分隔符与工作目录。
- 调试技巧
- 使用 X-Trace-ID 头部贯穿请求链路,结合结构化日志快速定位。
- 查看 /metrics 端点了解整体运行状态与按类型分布。
- 在执行器内部记录关键步骤与耗时,便于性能分析。
- 资源清理
- 确保执行器内部的资源(文件句柄、网络连接、子进程)在 Execute 返回后及时释放。
- 对外部系统调用使用 context.WithTimeout/WithCancel,避免阻塞。
章节来源
- handler.go:76-85
- scheduler.go:132-137
- http.go:27-31
- shell.go:36-44
- file.go:31-33
结论
ExecGo 通过“接口 + 注册表 + 分层调度”的设计,提供了高度可扩展的执行器体系。遵循本文的实现规范与最佳实践,开发者可以快速、安全地扩展自定义执行器,并在生产环境中稳定运行。
附录
开发流程:从接口实现到注册使用
- 步骤一:实现 Executor 接口
- 定义执行器类型常量(Type() 返回值)。
- 解析 Task.Params,执行业务逻辑,返回 JSON 结果与错误。
- 步骤二:注册执行器
- 在 init() 中调用 executor.Register(&YourExecutor{})。
- 或在 main() 中调用 RegisterBuiltins() 前后注册自定义执行器。
- 步骤三:验证与测试
- 使用 API 提交任务图,观察调度器日志与状态变化。
- 通过 /metrics 与 /health 验证系统健康状况。
- 步骤四:上线与监控
- 配置最大并发、优雅关闭超时、数据目录等参数。
- 结合结构化日志与 trace_id 进行线上问题定位。
章节来源
- executor.go:31-36
- executor.go:62-67
- main.go:39-41
- README.md:229-249
实际开发示例(按场景与复杂度递增)
示例一:简单 HTTP 执行器(与内置类似)
- 目标:封装外部服务调用,支持方法、头、体与超时。
- 关键点:
- 参数结构与内置一致,便于统一管理。
- 注意响应体大小限制与错误码处理。
- 参考实现位置
- http.go:14-75
章节来源
- http.go:14-75
示例二:受限 Shell 执行器(与内置类似)
- 目标:在白名单基础上扩展命令集,支持工作目录与上下文。
- 关键点:
- 新增命令需同步更新白名单。
- 注意命令路径解析与工作目录切换。
- 参考实现位置
- shell.go:14-22
- shell.go:36-79
章节来源
- shell.go:14-22
- shell.go:36-79
示例三:文件系统执行器(与内置类似)
- 目标:扩展更多文件操作(如复制、移动、权限变更)。
- 关键点:
- 路径清洗与权限校验。
- 大文件处理与进度反馈。
- 参考实现位置
- file.go:13-19
- file.go:25-113
章节来源
- file.go:13-19
- file.go:25-113
示例四:数据库执行器(高复杂度)
- 目标:执行 SQL 语句或事务,支持连接池、超时与重试。
- 设计要点:
- 参数结构:DSN、SQL、参数绑定、事务标志。
- 连接池管理:初始化与回收。
- 错误分类:语法错误、连接失败、超时、死锁等。
- 结果结构:影响行数、结果集、错误信息。
- 生命周期与资源清理:
- 使用 context.WithTimeout 控制 SQL 执行。
- 事务结束后释放连接。
- 参考实现位置
- scheduler.go:163-173
- state.go:94-108
章节来源
- scheduler.go:163-173
- state.go:94-108
示例五:消息队列执行器(高复杂度)
- 目标:发布/消费消息,支持分区、偏移与幂等。
- 设计要点:
- 参数结构:Broker、Topic、Partition、Offset、Payload。
- 幂等:基于消息 ID 去重。
- 错误处理:网络异常、分区不可用、重复消费。
- 参考实现位置
- http.go:27-31
- scheduler.go:152-179
章节来源
- http.go:27-31
- scheduler.go:152-179
测试策略
- 单元测试
- 针对执行器的 Execute 方法,构造不同参数与错误场景(解析失败、超时、网络错误、文件权限等)。
- 集成测试
- 通过 API 提交任务图,验证调度器的依赖推进、重试与状态更新。
- 性能测试
- 并发压力测试,评估最大并发下的吞吐与延迟。
- 可观测性测试
- 检查 trace_id 传播、日志结构化输出与指标准确性。
章节来源
- handler.go:58-99
- scheduler.go:127-190
错误处理最佳实践
- 明确错误语义:区分参数错误、执行错误、超时错误与系统错误。
- 保持上下文:在错误中保留关键上下文(任务 ID、类型、参数片段)。
- 优雅降级:在网络/IO 不稳定时返回部分结果或重试。
- 限流与熔断:在外部依赖不稳定时主动退避或短路。
章节来源
- http.go:27-31
- shell.go:36-44
- file.go:31-33
- scheduler.go:152-179