ExecGo 的执行是异步的:提交后不会立刻得到最终结果。因此上层编排层通常需要实现“提交 -> 轮询 -> 读取结果/失败原因”的闭环,并尽量做到幂等/可恢复。
1. 提交后的异步行为
上层通过 POST /tasks 提交一个 TaskGraph 后:
- 接口立即返回
202 Accepted和task_ids - 调度器后台开始执行(按依赖关系并发执行)
- 你必须通过
GET /tasks/{id}轮询每个任务,直到出现最终状态:success:成功完成,result可读failed:最终失败,error可读skipped:通常表示上游依赖失败导致未执行
2. 上层“幂等”要点:task.id 决定是否会被覆盖
ExecGo 内部状态存储在 store.Store(默认是 pkg/store/jsonfile)。默认实现中:
Put(task)是“存储或更新”:如果task.ID已存在,会被新的 task 覆盖- 这意味着:如果你因为网络超时等原因再次
POST /tasks,并且复用了相同task.id,那么这些任务可能会被重新置为执行状态并再次跑起来
因此,幂等策略通常是二选一:
- 方案 A:复用 task.id(用于“同一 workflow run 的重复请求安全”)
- 你需要先用
GET /tasks/{id}判断任务是否已经存在并处于你可接受的状态 - 如果已在
success/failed/skipped,直接读取结果;不要重复提交 - 如果已存在但仍是
pending/running,不要重复提交,继续轮询到结束
- 你需要先用
- 方案 B:为每次“重新执行”生成新 task.id(用于“允许重复跑,但区分执行轮次”)
- 常见做法是把 workflowRunId/attemptId 也拼到
task.id中
- 常见做法是把 workflowRunId/attemptId 也拼到
3. 推荐轮询算法(每个 task 一条轮询)
一个简单且可靠的轮询流程:
- 提交
POST /tasks,拿到task_ids - 对每个
task_id:- 以指数退避轮询
GET /tasks/{id} - 当状态为
success/failed/skipped时停止轮询
- 以指数退避轮询
- 结合你的业务 DAG 逻辑做后续决策
退避建议(仅示例,可按业务调参):
- 初始间隔:
500ms - 最大间隔:
5s - 每次乘以 1.5~2,直到达到最大间隔
4. 删除(DELETE)与运行中任务的关系
ExecGo 的 DELETE /tasks/{id} 只影响状态存储中的记录,并不等价于“取消正在执行的任务”。
对上层来说,这意味着:
- 你可以把 DELETE 当作“清理状态/回收资源”的工具
- 但不要把它当作“取消执行”的强保证
5. 动态 params:当下游依赖上游产物时怎么做?
因为 ExecGo 不做“自动把上游 result 注入到下游 params”的变量替换,你需要:
- 先提交上游并轮询拿到
result/error - 然后二次提交下游,并在编排层把上游产物手动注入到下游
params
这会把你的整体流程变成“多次提交(分阶段)”,而不是一次性提交动态 DAG。