execgo
<cite> **本文档引用的文件** - [main.go](file://cmd/execgo/main.go) - [executor.go](file://internal/executor/executor.go) - [http.go](file://internal/executor/http.go) - [shell.go](file://internal/executor/shell.go) - [file.go](file://internal/executor/file.go) - [task.go](file://internal/models/task.go) - [scheduler.go](file://internal/scheduler/scheduler.go) - [state.go](file://internal/state/state.go) - [handler.go](file://internal/api/handler.go) - [observability.go](file://internal/observability/observability.go) - [README.md](file://README.md) </cite>

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖分析
  7. 性能考虑
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本指南面向需要在 ExecGo 引擎中扩展自定义执行器的开发者。ExecGo 采用“插件式执行器”设计,通过统一的 Executor 接口与全局注册表实现可扩展的任务执行能力。本文将系统讲解:

  • 执行器接口的实现要求与命名规范
  • 从接口实现到注册使用的完整开发流程
  • 生命周期管理、资源清理与错误处理最佳实践
  • 调试技巧、测试策略与性能优化建议
  • 多个自定义执行器的实际开发示例(按场景与复杂度递增)

项目结构

ExecGo 的核心围绕“API 层 → 调度器 → 执行器 → 状态管理”的分层架构展开。执行器模块位于 internal/executor,提供统一接口与内置实现,并通过注册表对外暴露。

graph TB
subgraph "入口与配置"
MAIN["cmd/execgo/main.go<br/>应用入口"]
end
subgraph "API 层"
API["internal/api/handler.go<br/>HTTP 路由与处理器"]
end
subgraph "调度器"
SCHED["internal/scheduler/scheduler.go<br/>DAG 调度与并发控制"]
end
subgraph "执行器"
EXIF["internal/executor/executor.go<br/>接口与注册表"]
EXHTTP["internal/executor/http.go<br/>HTTP 执行器"]
EXSHELL["internal/executor/shell.go<br/>Shell 执行器"]
EXFILE["internal/executor/file.go<br/>文件执行器"]
end
subgraph "状态与可观测"
STATE["internal/state/state.go<br/>内存状态与持久化"]
OBS["internal/observability/observability.go<br/>日志/追踪/指标"]
end
MAIN --> API
API --> SCHED
SCHED --> EXIF
EXIF --> EXHTTP
EXIF --> EXSHELL
EXIF --> EXFILE
SCHED --> STATE
API --> OBS
MAIN --> OBS

图表来源

  • main.go:25-104
  • handler.go:39-52
  • scheduler.go:34-45
  • executor.go:14-67
  • http.go:22-75
  • shell.go:31-79
  • file.go:20-113
  • state.go:17-53
  • observability.go:46-80

章节来源

  • README.md:149-177
  • main.go:25-104
  • handler.go:39-52
  • scheduler.go:34-45
  • executor.go:14-67

核心组件

  • 执行器接口与注册表
    • 接口定义包含 Type() 与 Execute() 两个方法,Type() 用于唯一标识执行器类型,Execute() 执行任务并返回 JSON 结果。
    • 注册表提供 Register、Get、RegisteredTypes 与 RegisterBuiltins 等能力,支持内置执行器注册与查询。
  • 内置执行器
    • HTTPExecutor:封装 HTTP 请求,支持方法、头、体等参数,限制响应大小。
    • ShellExecutor:白名单命令执行,确保安全;支持工作目录与上下文取消。
    • FileExecutor:文件读写、追加、删除、统计,路径清洗防越狱。
  • 调度器
    • 基于 DAG 的并发调度,支持重试(指数退避)、超时(context 控制)、依赖级联与状态推进。
  • 状态管理
    • 内存状态 + JSON 文件持久化,支持周期性与最终持久化,崩溃后将 running 状态重置为 pending。
  • API 层
    • 提交任务图、查询任务、删除任务、健康检查、指标端点;提交前校验任务类型是否存在对应执行器。

章节来源

  • executor.go:14-67
  • http.go:14-75
  • shell.go:24-79
  • file.go:13-113
  • scheduler.go:69-230
  • state.go:25-179
  • handler.go:58-99

架构总览

ExecGo 的执行链路如下:客户端通过 API 提交任务图,API 校验任务类型并提交给调度器;调度器根据依赖关系并发执行,按类型从注册表获取执行器;执行器完成任务后返回 JSON 结果,调度器更新状态并级联触发下游任务。

sequenceDiagram
participant Client as "客户端"
participant API as "API 层"
participant Sched as "调度器"
participant Reg as "执行器注册表"
participant Exec as "具体执行器"
participant State as "状态管理"
Client->>API : "POST /tasks 提交任务图"
API->>API : "校验任务图与类型"
API->>Sched : "Submit(graph)"
Sched->>Reg : "Get(task.Type)"
Reg-->>Sched : "Executor 实例"
loop 并发执行
Sched->>Exec : "Execute(ctx, task)"
Exec-->>Sched : "json.RawMessage, error"
Sched->>State : "UpdateStatus(...)"
Sched->>Sched : "级联触发下游"
end
API-->>Client : "202 Accepted + 任务ID列表"

图表来源

  • handler.go:58-99
  • scheduler.go:127-190
  • executor.go:38-48

详细组件分析

执行器接口与注册表

  • 接口职责
    • Type():返回执行器类型字符串,用于在注册表中唯一标识该执行器。
    • Execute(ctx, task):执行任务,返回 JSON 结果与错误。错误会触发调度器的重试与失败状态记录。
  • 注册表机制
    • Register(e):将执行器实例注册到全局映射。
    • Get(taskType):按类型获取执行器,未找到则返回错误。
    • RegisteredTypes():返回所有已注册类型,用于 API 校验与提示。
    • RegisterBuiltins():注册内置执行器(HTTP、Shell、File)。
classDiagram
class Executor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class HTTPExecutor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class ShellExecutor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class FileExecutor {
+Type() string
+Execute(ctx, task) (json.RawMessage, error)
}
class Registry {
+Register(e)
+Get(taskType) (Executor, error)
+RegisteredTypes() []string
+RegisterBuiltins()
}
Executor <|.. HTTPExecutor
Executor <|.. ShellExecutor
Executor <|.. FileExecutor
Registry --> Executor : "管理与查询"

图表来源

  • executor.go:14-67
  • http.go:22-75
  • shell.go:31-79
  • file.go:20-113

章节来源

  • executor.go:14-67

HTTP 执行器

  • 参数结构:URL、Method、Headers、Body。
  • 行为要点:
    • Method 缺省为 GET。
    • 限制响应体大小(1MB),避免内存膨胀。
    • 即使 HTTP 错误码 ≥ 400 也返回结果,便于上层判断与记录。
  • 错误处理:参数解析失败、请求创建失败、网络错误、响应读取失败均返回带上下文的错误。
flowchart TD
Start(["进入 Execute"]) --> Parse["解析参数"]
Parse --> Valid{"参数有效?"}
Valid --> |否| ErrParse["返回解析错误"]
Valid --> |是| BuildReq["构造请求(含上下文)"]
BuildReq --> DoReq["发送请求"]
DoReq --> ReadResp["读取响应(限制大小)"]
ReadResp --> StatusOK{"状态码 < 400 ?"}
StatusOK --> |是| MarshalOK["序列化成功结果"]
StatusOK --> |否| MarshalWarn["序列化警告结果"]
MarshalOK --> Done(["返回"])
MarshalWarn --> Done
ErrParse --> Done

图表来源

  • http.go:27-75

章节来源

  • http.go:14-75

Shell 执行器(白名单)

  • 安全策略:仅允许预定义白名单命令,防止任意命令执行。
  • 参数结构:Command、Args、Dir。
  • 行为要点:
    • 提取基础命令名进行白名单校验。
    • 支持工作目录设置与上下文取消。
    • 输出 stdout/stderr 与退出码,错误时返回带上下文的错误。
  • 错误处理:参数解析失败、命令不在白名单、执行失败均返回带上下文的错误。
flowchart TD
Start(["进入 Execute"]) --> Parse["解析参数"]
Parse --> Valid{"参数有效且有命令?"}
Valid --> |否| ErrParse["返回解析错误"]
Valid --> |是| Base["提取基础命令名"]
Base --> Check{"在白名单?"}
Check --> |否| ErrWL["返回白名单错误"]
Check --> |是| Cmd["构建命令(含上下文/工作目录)"]
Cmd --> Run["执行命令"]
Run --> Result["组装结果(输出+退出码)"]
Result --> Done(["返回"])
ErrParse --> Done
ErrWL --> Done

图表来源

  • shell.go:36-79

章节来源

  • shell.go:14-79

文件执行器

  • 参数结构:Action(read/write/append/delete/stat)、Path、Content。
  • 行为要点:
    • 路径清洗(filepath.Clean)防止目录穿越。
    • write/append:自动创建目录,追加或截断写入。
    • stat:返回文件元信息。
  • 错误处理:参数解析失败、路径缺失、文件操作失败均返回带上下文的错误。
flowchart TD
Start(["进入 Execute"]) --> Parse["解析参数"]
Parse --> PathEmpty{"Path 是否为空?"}
PathEmpty --> |是| ErrPath["返回路径错误"]
PathEmpty --> |否| Clean["路径清洗"]
Clean --> Action{"Action 类型?"}
Action --> |read| Read["读取文件"]
Action --> |write| Write["写入文件(创建目录)"]
Action --> |append| Append["追加文件(创建目录)"]
Action --> |delete| Delete["删除文件"]
Action --> |stat| Stat["获取文件信息"]
Read --> Done(["返回"])
Write --> Done
Append --> Done
Delete --> Done
Stat --> Done
ErrPath --> Done

图表来源

  • file.go:25-113

章节来源

  • file.go:13-113

调度器与生命周期

  • 生命周期
    • Start:启动工作循环,维护就绪队列、并发信号量与依赖计数。
    • Stop:优雅关闭,等待所有工作协程结束。
    • Submit:提交任务图,构建依赖图并入队无依赖任务。
  • 执行流程
    • 从就绪队列取出任务,获取并发槽,调用执行器 Execute。
    • 支持重试(指数退避,上限 10 秒),支持超时(毫秒)。
    • 成功/失败后更新状态并级联触发下游任务。
  • 资源清理
    • 通过 context.WithCancel/WithTimeout 控制执行器生命周期。
    • 任务完成后释放并发槽,避免资源泄漏。
sequenceDiagram
participant S as "调度器"
participant Q as "就绪队列"
participant E as "执行器"
participant M as "状态管理"
S->>Q : "enqueue(task)"
loop 主循环
Q-->>S : "task"
S->>S : "获取并发槽"
S->>E : "Execute(ctx, task)"
alt 成功
E-->>S : "result"
S->>M : "UpdateStatus(success)"
S->>S : "级联触发下游"
else 失败
E-->>S : "error"
S->>M : "UpdateStatus(failed)"
end
S->>S : "释放并发槽"
end

图表来源

  • scheduler.go:109-190

章节来源

  • scheduler.go:47-67
  • scheduler.go:69-97
  • scheduler.go:127-190
  • scheduler.go:192-230

状态管理与持久化

  • 内存状态:map[string]*Task + RWMutex,提供 Put/Get/UpdateStatus/Delete/GetAll。
  • 持久化策略:
    • 启动时从 data/state.json 加载,若存在 running 任务则重置为 pending。
    • 周期性持久化(默认 30 秒),最终持久化在优雅关闭时触发。
    • 使用临时文件 + 原子重命名保证一致性。
flowchart TD
Start(["启动"]) --> Load["加载持久化数据"]
Load --> Reset{"发现 running 任务?"}
Reset --> |是| ToPending["重置为 pending"]
Reset --> |否| Ready["准备就绪"]
Ready --> Loop["周期性持久化循环"]
Loop --> Tick["定时器触发"]
Tick --> Persist["写入临时文件并重命名"]
Persist --> Loop
Loop --> Stop["收到停止信号"]
Stop --> FinalPersist["最终持久化"]
FinalPersist --> End(["结束"])

图表来源

  • state.go:25-53
  • state.go:110-134
  • state.go:160-179

章节来源

  • state.go:25-179

API 层与可观测性

  • API 层
    • 提交任务图:校验 JSON 与 TaskGraph,检查任务类型是否存在对应执行器,提交后返回 202 与任务 ID 列表。
    • 查询/删除/健康检查/指标端点。
  • 可观测性
    • 结构化 JSON 日志(slog)。
    • TraceMiddleware 注入/透传 trace_id,便于跨组件关联。
    • Metrics 提供任务总数、运行中、成功、失败与按类型计数。
sequenceDiagram
participant C as "客户端"
participant H as "API 处理器"
participant V as "校验器"
participant R as "注册表"
participant S as "调度器"
C->>H : "POST /tasks"
H->>V : "解码并校验任务图"
V-->>H : "校验通过/失败"
H->>R : "Get(task.Type)"
R-->>H : "执行器存在/不存在"
alt 存在
H->>S : "Submit(graph)"
H-->>C : "202 Accepted + 任务ID列表"
else 不存在
H-->>C : "400 Bad Request + 可用类型提示"
end

图表来源

  • handler.go:58-99
  • executor.go:38-48

章节来源

  • handler.go:58-146
  • observability.go:46-134

依赖分析

  • 组件耦合
    • API 层依赖调度器与注册表;调度器依赖注册表与状态管理;执行器彼此独立,仅通过接口与注册表交互。
  • 外部依赖
    • 项目采用纯标准库,无第三方依赖,降低维护成本与部署复杂度。
  • 循环依赖
    • 未发现循环依赖;分层清晰,接口边界明确。
graph LR
API["API 层"] --> SCHED["调度器"]
SCHED --> REG["执行器注册表"]
REG --> EX["执行器实现"]
SCHED --> STATE["状态管理"]
API --> OBS["可观测性"]
MAIN["入口"] --> API
MAIN --> OBS

图表来源

  • handler.go:39-52
  • scheduler.go:34-45
  • executor.go:26-29
  • state.go:17-23
  • main.go:25-104

章节来源

  • go.mod:1-4
  • README.md:253-261

性能考虑

  • 并发与背压
    • 调度器通过信号量控制最大并发,避免资源争用;就绪队列容量为 1024,满载时采用异步回填策略。
  • 超时与重试
    • 任务支持毫秒级超时;执行失败按指数退避重试,上限 10 秒,避免雪崩效应。
  • I/O 限制
    • HTTP 执行器限制响应体大小;文件写入使用 O_TRUNC/O_APPEND 控制;Shell 执行器输出缓冲区受控。
  • 持久化
    • 周期性持久化减少磁盘压力;最终持久化确保优雅关闭时数据落盘。
  • 指标与日志
    • 提供任务总量、运行中、成功、失败与按类型计数;结构化日志便于定位问题。

章节来源

  • scheduler.go:40-44
  • scheduler.go:100-107
  • scheduler.go:152-179
  • http.go:60-63
  • state.go:160-179

故障排查指南

  • 常见错误与定位
    • 任务类型未知:API 层会返回可用类型列表,检查 Type() 返回值与注册是否一致。
    • 参数解析失败:检查 params JSON 结构与字段名称,参考内置执行器参数格式。
    • 超时/重试:查看调度器日志中的重试次数与退避间隔;必要时调整任务 timeout 与 retry。
    • 文件路径问题:确认路径清洗与权限;避免相对路径与越界访问。
    • Shell 命令被拒绝:确认命令在白名单内;注意路径分隔符与工作目录。
  • 调试技巧
    • 使用 X-Trace-ID 头部贯穿请求链路,结合结构化日志快速定位。
    • 查看 /metrics 端点了解整体运行状态与按类型分布。
    • 在执行器内部记录关键步骤与耗时,便于性能分析。
  • 资源清理
    • 确保执行器内部的资源(文件句柄、网络连接、子进程)在 Execute 返回后及时释放。
    • 对外部系统调用使用 context.WithTimeout/WithCancel,避免阻塞。

章节来源

  • handler.go:76-85
  • scheduler.go:132-137
  • http.go:27-31
  • shell.go:36-44
  • file.go:31-33

结论

ExecGo 通过“接口 + 注册表 + 分层调度”的设计,提供了高度可扩展的执行器体系。遵循本文的实现规范与最佳实践,开发者可以快速、安全地扩展自定义执行器,并在生产环境中稳定运行。

附录

开发流程:从接口实现到注册使用

  • 步骤一:实现 Executor 接口
    • 定义执行器类型常量(Type() 返回值)。
    • 解析 Task.Params,执行业务逻辑,返回 JSON 结果与错误。
  • 步骤二:注册执行器
    • 在 init() 中调用 executor.Register(&YourExecutor{})。
    • 或在 main() 中调用 RegisterBuiltins() 前后注册自定义执行器。
  • 步骤三:验证与测试
    • 使用 API 提交任务图,观察调度器日志与状态变化。
    • 通过 /metrics 与 /health 验证系统健康状况。
  • 步骤四:上线与监控
    • 配置最大并发、优雅关闭超时、数据目录等参数。
    • 结合结构化日志与 trace_id 进行线上问题定位。

章节来源

  • executor.go:31-36
  • executor.go:62-67
  • main.go:39-41
  • README.md:229-249

实际开发示例(按场景与复杂度递增)

示例一:简单 HTTP 执行器(与内置类似)

  • 目标:封装外部服务调用,支持方法、头、体与超时。
  • 关键点:
    • 参数结构与内置一致,便于统一管理。
    • 注意响应体大小限制与错误码处理。
  • 参考实现位置
    • http.go:14-75

章节来源

  • http.go:14-75

示例二:受限 Shell 执行器(与内置类似)

  • 目标:在白名单基础上扩展命令集,支持工作目录与上下文。
  • 关键点:
    • 新增命令需同步更新白名单。
    • 注意命令路径解析与工作目录切换。
  • 参考实现位置
    • shell.go:14-22
    • shell.go:36-79

章节来源

  • shell.go:14-22
  • shell.go:36-79

示例三:文件系统执行器(与内置类似)

  • 目标:扩展更多文件操作(如复制、移动、权限变更)。
  • 关键点:
    • 路径清洗与权限校验。
    • 大文件处理与进度反馈。
  • 参考实现位置
    • file.go:13-19
    • file.go:25-113

章节来源

  • file.go:13-19
  • file.go:25-113

示例四:数据库执行器(高复杂度)

  • 目标:执行 SQL 语句或事务,支持连接池、超时与重试。
  • 设计要点:
    • 参数结构:DSN、SQL、参数绑定、事务标志。
    • 连接池管理:初始化与回收。
    • 错误分类:语法错误、连接失败、超时、死锁等。
    • 结果结构:影响行数、结果集、错误信息。
  • 生命周期与资源清理:
    • 使用 context.WithTimeout 控制 SQL 执行。
    • 事务结束后释放连接。
  • 参考实现位置
    • scheduler.go:163-173
    • state.go:94-108

章节来源

  • scheduler.go:163-173
  • state.go:94-108

示例五:消息队列执行器(高复杂度)

  • 目标:发布/消费消息,支持分区、偏移与幂等。
  • 设计要点:
    • 参数结构:Broker、Topic、Partition、Offset、Payload。
    • 幂等:基于消息 ID 去重。
    • 错误处理:网络异常、分区不可用、重复消费。
  • 参考实现位置
    • http.go:27-31
    • scheduler.go:152-179

章节来源

  • http.go:27-31
  • scheduler.go:152-179

测试策略

  • 单元测试
    • 针对执行器的 Execute 方法,构造不同参数与错误场景(解析失败、超时、网络错误、文件权限等)。
  • 集成测试
    • 通过 API 提交任务图,验证调度器的依赖推进、重试与状态更新。
  • 性能测试
    • 并发压力测试,评估最大并发下的吞吐与延迟。
  • 可观测性测试
    • 检查 trace_id 传播、日志结构化输出与指标准确性。

章节来源

  • handler.go:58-99
  • scheduler.go:127-190

错误处理最佳实践

  • 明确错误语义:区分参数错误、执行错误、超时错误与系统错误。
  • 保持上下文:在错误中保留关键上下文(任务 ID、类型、参数片段)。
  • 优雅降级:在网络/IO 不稳定时返回部分结果或重试。
  • 限流与熔断:在外部依赖不稳定时主动退避或短路。

章节来源

  • http.go:27-31
  • shell.go:36-44
  • file.go:31-33
  • scheduler.go:152-179