目录
简介
ExecGo 是一个极简的 AI 执行引擎,提供任务提交、DAG 调度、并发执行和可观测性的 HTTP 服务。本文档详细阐述了 ExecGo 的数据持久化策略,重点分析其内存状态管理与磁盘持久化的双重机制,包括状态同步策略、一致性保证、任务状态生命周期管理、状态恢复机制以及性能优化措施。
项目结构概览
ExecGo 采用模块化架构设计,主要包含以下核心模块:
graph TB
subgraph "应用入口层"
Main[main.go<br/>应用入口]
Config[config.go<br/>配置管理]
end
subgraph "业务逻辑层"
API[handler.go<br/>API 服务器]
Scheduler[scheduler.go<br/>任务调度器]
State[state.go<br/>状态管理器]
Models[task.go<br/>数据模型]
end
subgraph "执行器层"
Executor[executor.go<br/>执行器接口]
File[file.go<br/>文件执行器]
Shell[shell.go<br/>Shell 执行器]
HTTP[http.go<br/>HTTP 执行器]
end
subgraph "基础设施层"
Observability[observability.go<br/>可观测性]
Data[(data/state.json<br/>持久化数据)]
end
Main --> Config
Main --> API
Main --> Scheduler
Main --> State
API --> Scheduler
Scheduler --> State
Scheduler --> Executor
State --> Data
Executor --> File
Executor --> Shell
Executor --> HTTP
图表来源
- main.go:25-104
- state.go:17-53
- scheduler.go:18-45
核心组件架构
状态管理器架构
状态管理器是 ExecGo 的核心组件,负责维护任务状态的内存存储和磁盘持久化:
classDiagram
class Manager {
-sync.RWMutex mu
-map~string,*Task~ tasks
-string filePath
-*slog.Logger logger
+Put(task *Task)
+Get(id string) (*Task, bool)
+GetAll() []*Task
+Delete(id string) bool
+UpdateStatus(id string, status TaskStatus, result json.RawMessage, errMsg string) bool
+Persist() error
+StartPeriodicPersist(interval time.Duration, stop <-chan struct{})
-loadFromDisk() error
}
class Task {
+string ID
+string Type
+json.RawMessage Params
+[]string DependsOn
+int Retry
+int64 Timeout
+TaskStatus Status
+json.RawMessage Result
+string Error
+time.Time CreatedAt
+time.Time UpdatedAt
}
class TaskStatus {
<<enumeration>>
PENDING
RUNNING
SUCCESS
FAILED
SKIPPED
}
Manager --> Task : "管理"
Task --> TaskStatus : "使用"
图表来源
- state.go:17-53
- task.go:21-34
调度器与状态管理交互
调度器通过状态管理器协调任务执行和状态更新:
sequenceDiagram
participant Sched as 调度器
participant State as 状态管理器
participant Exec as 执行器
participant Disk as 磁盘
Sched->>State : 提交任务
State->>State : Put(task)
Sched->>State : UpdateStatus(运行中)
State->>State : UpdateStatus(运行中)
Sched->>Exec : 执行任务
Exec-->>Sched : 返回结果
Sched->>State : UpdateStatus(成功/失败)
State->>Disk : 定期持久化
State->>Disk : 最终持久化
图表来源
- scheduler.go:69-97
- scheduler.go:127-190
- state.go:110-134
内存状态管理
状态存储结构
ExecGo 使用内存中的哈希表来存储所有任务状态,提供高效的读写操作:
graph LR
subgraph "内存状态存储"
TasksMap[任务映射表<br/>map[string]*Task]
RWLock[读写锁<br/>sync.RWMutex]
end
subgraph "并发控制"
RLock[读锁]
WLock[写锁]
LockGuard[锁保护]
end
TasksMap --> RWLock
RLock --> LockGuard
WLock --> LockGuard
图表来源
- state.go:18-23
状态访问模式
状态管理器提供了多种访问模式以满足不同场景的需求:
| 方法 | 访问模式 | 主要用途 |
|---|---|---|
| Put | 写入 | 存储新任务或更新现有任务 |
| Get | 读取 | 获取单个任务详情 |
| GetAll | 读取 | 获取所有任务列表 |
| Delete | 写入 | 删除指定任务 |
| UpdateStatus | 原子更新 | 更新任务状态和结果 |
章节来源
- state.go:55-92
磁盘持久化机制
文件存储格式
ExecGo 使用 JSON 格式将内存中的任务状态持久化到磁盘文件 state.json:
flowchart TD
Start([开始持久化]) --> Snapshot["创建内存快照"]
Snapshot --> Marshal["序列化为 JSON"]
Marshal --> WriteTmp["写入临时文件<br/>state.json.tmp"]
WriteTmp --> AtomicRename["原子重命名<br/>覆盖原文件"]
AtomicRename --> End([完成持久化])
WriteTmp --> ErrorCheck{"写入错误?"}
ErrorCheck --> |是| Cleanup["清理临时文件"]
Cleanup --> ErrorEnd([错误结束])
ErrorCheck --> |否| Continue["继续流程"]
Continue --> AtomicRename
图表来源
- state.go:110-134
持久化文件示例
根据示例数据,state.json 文件包含多个任务的状态信息:
| 字段 | 类型 | 描述 | 示例值 |
|---|---|---|---|
| id | string | 任务唯一标识符 | "step1" |
| type | string | 任务类型 | "file" |
| params | object | 任务参数 | 包含具体执行参数 |
| depends_on | array | 依赖任务列表 | ["step1"] |
| status | string | 任务执行状态 | "success" |
| result | object | 执行结果 | 包含具体结果数据 |
| error | string | 错误信息 | 失败时的错误描述 |
| created_at | string | 创建时间 | ISO8601 时间戳 |
| updated_at | string | 最后更新时间 | ISO8601 时间戳 |
章节来源
- state.json:1-76
状态同步策略
实时同步机制
ExecGo 采用"内存优先,定期持久化"的策略,确保数据的实时性和可靠性:
sequenceDiagram
participant App as 应用程序
participant Mem as 内存状态
participant Disk as 磁盘存储
participant Timer as 定时器
App->>Mem : 更新任务状态
Mem-->>App : 确认更新
Note over Mem : 立即反映在内存中
Timer->>Disk : 触发持久化
Mem->>Disk : 写入当前状态
Disk-->>Timer : 持久化完成
Note over Disk : 定期保存到文件
图表来源
- main.go:53-55
- state.go:160-179
同步触发时机
状态同步在以下情况下触发:
- 定时持久化:每 30 秒自动触发一次
- 优雅关闭:应用停止前进行最终持久化
- 异常情况:持久化过程中出现错误时的重试
章节来源
- main.go:53-103
- state.go:160-179
一致性保证
原子性保证
ExecGo 通过原子重命名操作确保持久化的一致性:
flowchart TD
Start([开始写入]) --> CreateTmp["创建临时文件"]
CreateTmp --> WriteData["写入完整数据"]
WriteData --> AtomicOp["原子重命名"]
AtomicOp --> Success([写入成功])
WriteData --> Error{"写入错误?"}
Error --> |是| RemoveTmp["删除临时文件"]
RemoveTmp --> ErrorEnd([写入失败])
Error --> |否| AtomicOp
图表来源
- state.go:124-131
一致性级别
系统提供以下一致性保证:
- 强一致性:内存中的状态变更立即生效
- 最终一致性:磁盘持久化确保数据不会丢失
- 崩溃一致性:系统重启后能恢复到一致状态
章节来源
- state.go:124-131
任务状态生命周期
状态转换图
任务在整个生命周期中会经历以下状态转换:
stateDiagram-v2
[*] --> 待处理
待处理 --> 运行中 : 调度器选择
运行中 --> 成功 : 执行完成
运行中 --> 失败 : 执行错误
运行中 --> 跳过 : 依赖失败
成功 --> [*]
失败 --> 重试 : 配置重试
失败 --> [*]
跳过 --> [*]
note right of 运行中
执行器执行任务
支持超时和重试
end note
note right of 重试
指数退避策略
最多重试配置次数
end note
图表来源
- task.go:10-19
- scheduler.go:127-190
生命周期管理流程
flowchart TD
Submit[提交任务] --> InitStatus[初始化状态为待处理]
InitStatus --> BuildGraph[构建依赖图]
BuildGraph --> EnqueueReady[加入就绪队列]
EnqueueReady --> ExecuteTask[执行任务]
ExecuteTask --> CheckResult{执行结果}
CheckResult --> |成功| MarkSuccess[标记成功]
CheckResult --> |失败| CheckRetry{需要重试?}
CheckResult --> |依赖失败| MarkSkip[标记跳过]
CheckRetry --> |是| RetryTask[重试执行]
CheckRetry --> |否| MarkFailed[标记失败]
RetryTask --> ExecuteTask
MarkSuccess --> CascadeDownstream[级联下游任务]
MarkFailed --> CascadeDownstream
MarkSkip --> CascadeDownstream
CascadeDownstream --> Complete[完成]
图表来源
- scheduler.go:69-97
- scheduler.go:192-230
章节来源
- scheduler.go:69-230
状态恢复机制
启动时恢复流程
系统启动时会自动执行状态恢复过程:
sequenceDiagram
participant App as 应用程序
participant State as 状态管理器
participant Disk as 磁盘文件
participant Mem as 内存状态
App->>State : 初始化状态管理器
State->>Disk : 读取 state.json
Disk-->>State : 返回任务数据
State->>State : 解析 JSON 数据
State->>Mem : 加载到内存
State->>Mem : 重置运行中任务
Mem-->>App : 恢复完成
Note over Mem : 运行中任务重置为待处理
图表来源
- state.go:25-53
- state.go:136-158
恢复策略细节
- 数据加载:从磁盘读取 JSON 文件并解析为任务对象
- 状态重置:将所有运行中的任务重置为待处理状态
- 时间更新:更新最后修改时间为当前时间
- 日志记录:记录恢复过程中的重要信息
章节来源
- state.go:25-53
数据备份与迁移策略
备份策略
为了确保数据安全,建议采用以下备份策略:
- 定期备份:基于持久化间隔进行自动备份
- 增量备份:只备份自上次备份以来发生变化的数据
- 跨位置备份:将备份文件存储在不同的物理位置
- 版本保留:保留多个历史版本的备份文件
迁移策略
当需要进行系统迁移时,可以采用以下步骤:
- 停止写入:确保系统处于只读状态
- 导出数据:从 state.json 文件导出完整数据
- 验证完整性:检查导出数据的完整性和一致性
- 导入新系统:在新环境中导入数据
- 验证功能:确认新系统能够正确识别和处理数据
版本兼容性考虑
由于当前版本使用简单的 JSON 格式,建议:
- 向后兼容:新增字段时保持向后兼容性
- 版本标记:在文件头部添加版本信息
- 迁移工具:提供数据格式升级工具
- 降级支持:确保旧版本能够读取新格式
章节来源
- state.go:110-134
性能优化措施
缓存策略
ExecGo 采用了多层次的缓存策略来提升性能:
graph TB
subgraph "内存缓存层"
MemCache[内存任务缓存]
ReadCache[读取缓存]
WriteBuffer[写入缓冲]
end
subgraph "持久化层"
DiskCache[磁盘文件缓存]
FSBuffer[文件系统缓冲]
end
subgraph "网络层"
NetCache[网络连接池]
HTTPCache[HTTP 缓存]
end
MemCache --> ReadCache
MemCache --> WriteBuffer
WriteBuffer --> DiskCache
DiskCache --> FSBuffer
NetCache --> HTTPCache
批量写入优化
系统通过以下方式优化批量写入性能:
- 批量持久化:定期批量写入而非每次状态变更都写入
- 异步写入:使用 goroutine 异步执行持久化操作
- 写入合并:合并多个写入操作减少磁盘 I/O
- 背压控制:防止写入速度超过磁盘处理能力
并发控制优化
flowchart TD
Request[请求到达] --> AcquireLock[获取读写锁]
AcquireLock --> CheckOperation{操作类型}
CheckOperation --> |读操作| ReadLock[获取读锁]
CheckOperation --> |写操作| WriteLock[获取写锁]
ReadLock --> Process[处理请求]
WriteLock --> Process
Process --> ReleaseLock[释放锁]
ReleaseLock --> Response[返回响应]
图表来源
- state.go:18-23
章节来源
- state.go:160-179
故障处理与排障
常见故障类型
- 磁盘写入失败:文件系统权限问题或磁盘空间不足
- JSON 解析错误:state.json 文件损坏或格式不正确
- 内存溢出:大量任务导致内存占用过高
- 持久化延迟:磁盘性能问题导致持久化延迟
排障步骤
flowchart TD
Start([故障发生]) --> Identify[识别故障类型]
Identify --> CheckLogs[检查日志]
CheckLogs --> AnalyzeError{分析错误信息}
AnalyzeError --> |磁盘错误| CheckDisk[检查磁盘状态]
AnalyzeError --> |内存错误| CheckMemory[检查内存使用]
AnalyzeError --> |配置错误| CheckConfig[检查配置文件]
CheckDisk --> FixDisk[修复磁盘问题]
CheckMemory --> Optimize[优化内存使用]
CheckConfig --> FixConfig[修复配置问题]
FixDisk --> Verify[验证修复效果]
Optimize --> Verify
FixConfig --> Verify
Verify --> End([故障解决])
监控指标
系统提供以下关键监控指标:
| 指标名称 | 描述 | 类型 | 用途 |
|---|---|---|---|
| tasks_total | 总任务数 | 计数器 | 监控任务总量 |
| tasks_running | 运行中任务数 | 计数器 | 监控并发执行 |
| tasks_succeeded | 成功任务数 | 计数器 | 监控执行成功率 |
| tasks_failed | 失败任务数 | 计数器 | 监控执行失败率 |
| by_type | 按类型统计 | 映射 | 分析不同类型任务 |
章节来源
- observability.go:86-133
总结
ExecGo 的数据持久化策略通过内存状态管理和磁盘持久化的双重机制,实现了高性能和高可靠性的平衡。该策略的主要特点包括:
- 实时性与可靠性并重:内存中的即时响应和定期持久化的双重保障
- 原子性持久化:通过临时文件和原子重命名确保数据一致性
- 智能恢复机制:启动时自动恢复状态并重置运行中任务
- 性能优化:批量写入、异步处理和合理的并发控制
- 可扩展性:模块化设计便于功能扩展和维护
这种设计使得 ExecGo 能够在保证数据安全的前提下,提供高效的任务执行和管理能力,适用于各种需要可靠任务调度和执行的场景。