AI Agent 开发实战:从零构建一个自动化工作流系统
本文详细介绍如何从零开始构建一个生产级的 AI Agent 自动化工作流系统。我们将探讨 Agent 架构设计、任务调度机制、状态管理、错误处理等核心模块,并提供完整的代码示例和最佳实践。
AI Agent 开发实战:从零构建一个自动化工作流系统
摘要
本文详细介绍如何从零开始构建一个生产级的 AI Agent 自动化工作流系统。我们将探讨 Agent 架构设计、任务调度机制、状态管理、错误处理等核心模块,并提供完整的代码示例和最佳实践。适合有一定编程基础、希望将 AI 能力集成到实际业务中的开发者阅读。
一、引言:为什么需要 AI Agent 工作流系统
随着大语言模型(LLM)技术的快速发展,越来越多的企业和个人开发者开始探索如何将 AI 能力应用到实际业务场景中。然而,单纯调用 API 往往无法满足复杂业务需求——我们需要的是一个能够自主规划、执行任务、处理异常的完整系统。
AI Agent 工作流系统的核心价值在于:
- 任务编排:将复杂任务拆解为可执行的子任务序列
- 状态管理:跟踪任务执行进度,支持断点续传
- 工具集成:统一封装各类外部 API 和系统接口
- 错误恢复:自动重试、降级处理、人工介入机制
- 可观测性:完整的日志记录和性能监控
本文将基于实际项目经验,分享构建此类系统的关键设计决策和实现细节。
二、系统架构设计
2.1 核心组件
一个完整的 AI Agent 工作流系统通常包含以下核心组件:
┌─────────────────────────────────────────────────────────┐
│ API Gateway │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Task Orchestrator │
│ • 任务解析 • 依赖分析 • 执行计划生成 │
└─────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ (规划器) │ │ (执行器) │ │ (审核器) │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└───────────────┼───────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ Tool Registry │
│ • 文件操作 • 网络请求 • 数据库 • 第三方 API │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ State Store │
│ • 任务状态 • 执行历史 • 检查点 • 元数据 │
└─────────────────────────────────────────────────────────┘
2.2 数据模型设计
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
id: str
name: str
description: str
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Workflow:
id: str
name: str
tasks: List[Task] = field(default_factory=list)
dependencies: Dict[str, List[str]] = field(default_factory=dict)
status: TaskStatus = TaskStatus.PENDING
context: Dict[str, Any] = field(default_factory=dict)
三、核心模块实现
3.1 任务调度器
任务调度器是系统的"大脑",负责解析用户意图、生成执行计划、协调各 Agent 工作。
import asyncio
from typing import Callable, Awaitable
class TaskScheduler:
def __init__(self):
self.agents: Dict[str, Callable] = {}
self.tools: Dict[str, Callable] = {}
self.state_store: StateStore = StateStore()
def register_agent(self, name: str, handler: Callable):
"""注册 Agent 处理器"""
self.agents[name] = handler
def register_tool(self, name: str, tool: Callable):
"""注册工具函数"""
self.tools[name] = tool
async def execute_workflow(self, workflow: Workflow) -> WorkflowResult:
"""执行完整工作流"""
# 1. 验证依赖关系
self._validate_dependencies(workflow)
# 2. 生成执行计划(拓扑排序)
execution_plan = self._topological_sort(workflow)
# 3. 并行执行独立任务
results = []
for task_group in execution_plan:
group_results = await self._execute_task_group(
[workflow.tasks[t] for t in task_group],
workflow.context
)
results.extend(group_results)
# 4. 更新上下文
workflow.context.update(self._merge_results(group_results))
return WorkflowResult(
workflow_id=workflow.id,
status=WorkflowStatus.COMPLETED,
results=results
)
async def _execute_task_group(
self,
tasks: List[Task],
context: Dict[str, Any]
) -> List[TaskResult]:
"""并行执行一组独立任务"""
coroutines = [
self._execute_single_task(task, context)
for task in tasks
]
return await asyncio.gather(*coroutines, return_exceptions=True)
3.2 Agent 实现模式
模式一:ReAct(Reasoning + Acting)
ReAct 模式让 Agent 在每一步都进行"思考 - 行动 - 观察"的循环:
class ReActAgent:
def __init__(self, llm: LLMClient, tools: List[Tool]):
self.llm = llm
self.tools = {t.name: t for t in tools}
self.max_iterations = 10
async def run(self, task: str, context: Dict) -> AgentResult:
history = []
for i in range(self.max_iterations):
# Step 1: 思考(生成下一步计划)
thought = await self.llm.generate(
prompt=self._build_prompt(task, context, history)
)
# Step 2: 解析行动
action = self._parse_action(thought)
if action.type == "FINAL_ANSWER":
return AgentResult(
success=True,
output=action.output,
iterations=i + 1
)
# Step 3: 执行行动
observation = await self.tools[action.tool_name].execute(
action.parameters
)
# Step 4: 记录历史
history.append({
"thought": thought,
"action": action,
"observation": observation
})
return AgentResult(
success=False,
error="Maximum iterations exceeded",
iterations=self.max_iterations
)
模式二:Plan-and-Execute
对于复杂任务,先规划再执行往往更高效:
class PlanExecuteAgent:
def __init__(self, planner: LLMClient, executor: LLMClient, tools: List[Tool]):
self.planner = planner
self.executor = executor
self.tools = tools
async def run(self, task: str, context: Dict) -> AgentResult:
# Phase 1: 规划
plan = await self.planner.generate(
prompt=f"为以下任务制定详细执行计划:{task}"
)
steps = self._parse_plan(plan)
# Phase 2: 执行
results = []
for step in steps:
result = await self._execute_step(step, context, results)
results.append(result)
# 动态调整计划(可选)
if result.requires_replan:
steps = await self._replan(steps, result, context)
return self._synthesize_results(results)
3.3 状态持久化
生产系统必须支持断点续传和状态恢复:
import json
import aiofiles
class StateStore:
def __init__(self, storage_path: str = "./state"):
self.storage_path = storage_path
async def save_checkpoint(self, workflow_id: str, state: WorkflowState):
"""保存检查点"""
filepath = f"{self.storage_path}/{workflow_id}/checkpoint.json"
async with aiofiles.open(filepath, 'w') as f:
await f.write(json.dumps({
"timestamp": datetime.now().isoformat(),
"state": state.to_dict()
}, indent=2))
async def load_checkpoint(self, workflow_id: str) -> Optional[WorkflowState]:
"""加载最近的检查点"""
filepath = f"{self.storage_path}/{workflow_id}/checkpoint.json"
try:
async with aiofiles.open(filepath, 'r') as f:
data = json.loads(await f.read())
return WorkflowState.from_dict(data["state"])
except FileNotFoundError:
return None
async def save_execution_log(self, workflow_id: str, log: ExecutionLog):
"""追加执行日志"""
filepath = f"{self.storage_path}/{workflow_id}/execution.log"
async with aiofiles.open(filepath, 'a') as f:
await f.write(log.to_jsonl() + "\n")
四、错误处理与恢复
4.1 重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientExecutor:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True
)
async def execute_with_retry(self, task: Callable, *args, **kwargs):
"""带指数退避的重试执行"""
return await task(*args, **kwargs)
async def execute_with_fallback(
self,
primary: Callable,
fallback: Callable,
*args, **kwargs
):
"""主备切换执行"""
try:
return await primary(*args, **kwargs)
except Exception as e:
logging.warning(f"Primary failed: {e}, switching to fallback")
return await fallback(*args, **kwargs)
4.2 超时控制
async def execute_with_timeout(
coro: Awaitable,
timeout_seconds: float,
fallback: Optional[Awaitable] = None
):
"""带超时的任务执行"""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
if fallback:
return await fallback
raise TaskTimeoutError(f"Task exceeded {timeout_seconds}s timeout")
五、可观测性设计
5.1 结构化日志
import logging
from pythonjsonlogger import jsonlogger
def setup_structured_logging():
logger = logging.getLogger("agent_workflow")
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt="%(asctime)s %(name)s %(levelname)s %(message)s "
"%(workflow_id)s %(task_id)s %(duration_ms)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
5.2 指标监控
关键指标包括:
| 指标名称 | 类型 | 说明 |
|---|---|---|
| INLINE_CODE_0 | Histogram | 工作流执行时长分布 |
| INLINE_CODE_1 | Gauge | 任务成功率 |
| INLINE_CODE_2 | Counter | LLM Token 消耗量 |
| INLINE_CODE_3 | Counter | 工具调用次数 |
| INLINE_CODE_4 | Counter | 各类错误发生次数 |
六、最佳实践总结
6.1 设计原则
- 单一职责:每个 Agent 只负责一类任务
- 幂等性:任务可重复执行而不产生副作用
- 松耦合:Agent 之间通过消息队列或事件总线通信
- 可测试性:提供 Mock 工具和沙箱环境
6.2 安全考虑
- 权限隔离:不同 Agent 使用不同的 API Key 和访问权限
- 输入验证:严格校验用户输入和工具返回
- 敏感信息:日志中脱敏处理密码、Token 等敏感数据
- 速率限制:防止 LLM API 调用超出配额
6.3 性能优化
- 批量处理:合并多个小任务为批量请求
- 缓存策略:缓存 LLM 响应和工具执行结果
- 流式输出:长任务支持流式返回中间结果
- 异步并发:充分利用 asyncio 并发能力
七、结语
构建一个生产级的 AI Agent 工作流系统是一项复杂的工程,需要综合考虑架构设计、状态管理、错误处理、可观测性等多个方面。本文分享的设计模式和实现经验来源于实际项目,希望能帮助读者少走弯路。
未来,随着 Agent 技术的进一步发展,我们期待看到更多标准化的框架和工具出现,让开发者能够更专注于业务逻辑而非基础设施。
参考资料:
- ReAct Paper: https://arxiv.org/abs/2210.03629
- LangChain: https://python.langchain.com/
- AutoGen: https://microsoft.github.io/autogen/
本文首发于 RailX Blog,转载请注明出处。