折腾侠
技术教程

AI Agent 开发实战:从零构建一个自动化工作流系统

本文详细介绍如何从零开始构建一个生产级的 AI Agent 自动化工作流系统。我们将探讨 Agent 架构设计、任务调度机制、状态管理、错误处理等核心模块,并提供完整的代码示例和最佳实践。

折腾侠
2026/03/20 发布
20约 8 分钟1138 字 / 960 词00

AI Agent 开发实战:从零构建一个自动化工作流系统

摘要

本文详细介绍如何从零开始构建一个生产级的 AI Agent 自动化工作流系统。我们将探讨 Agent 架构设计、任务调度机制、状态管理、错误处理等核心模块,并提供完整的代码示例和最佳实践。适合有一定编程基础、希望将 AI 能力集成到实际业务中的开发者阅读。


一、引言:为什么需要 AI Agent 工作流系统

随着大语言模型(LLM)技术的快速发展,越来越多的企业和个人开发者开始探索如何将 AI 能力应用到实际业务场景中。然而,单纯调用 API 往往无法满足复杂业务需求——我们需要的是一个能够自主规划、执行任务、处理异常的完整系统。

AI Agent 工作流系统的核心价值在于:

  1. 任务编排:将复杂任务拆解为可执行的子任务序列
  2. 状态管理:跟踪任务执行进度,支持断点续传
  3. 工具集成:统一封装各类外部 API 和系统接口
  4. 错误恢复:自动重试、降级处理、人工介入机制
  5. 可观测性:完整的日志记录和性能监控

本文将基于实际项目经验,分享构建此类系统的关键设计决策和实现细节。


二、系统架构设计

2.1 核心组件

一个完整的 AI Agent 工作流系统通常包含以下核心组件:

┌─────────────────────────────────────────────────────────┐
│                    API Gateway                          │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                  Task Orchestrator                      │
│  • 任务解析  • 依赖分析  • 执行计划生成                 │
└─────────────────────────────────────────────────────────┘
                            │
            ┌───────────────┼───────────────┐
            ▼               ▼               ▼
    ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
    │   Agent A    │ │   Agent B    │ │   Agent C    │
    │  (规划器)    │ │  (执行器)    │ │  (审核器)    │
    └──────────────┘ └──────────────┘ └──────────────┘
            │               │               │
            └───────────────┼───────────────┘
                            ▼
┌─────────────────────────────────────────────────────────┐
│                   Tool Registry                         │
│  • 文件操作  • 网络请求  • 数据库  • 第三方 API          │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                  State Store                            │
│  • 任务状态  • 执行历史  • 检查点  • 元数据             │
└─────────────────────────────────────────────────────────┘

2.2 数据模型设计

Python
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class Task:
    id: str
    name: str
    description: str
    status: TaskStatus = TaskStatus.PENDING
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
@dataclass
class Workflow:
    id: str
    name: str
    tasks: List[Task] = field(default_factory=list)
    dependencies: Dict[str, List[str]] = field(default_factory=dict)
    status: TaskStatus = TaskStatus.PENDING
    context: Dict[str, Any] = field(default_factory=dict)

三、核心模块实现

3.1 任务调度器

任务调度器是系统的"大脑",负责解析用户意图、生成执行计划、协调各 Agent 工作。

Python
import asyncio
from typing import Callable, Awaitable

class TaskScheduler:
    def __init__(self):
        self.agents: Dict[str, Callable] = {}
        self.tools: Dict[str, Callable] = {}
        self.state_store: StateStore = StateStore()
        
    def register_agent(self, name: str, handler: Callable):
        """注册 Agent 处理器"""
        self.agents[name] = handler
        
    def register_tool(self, name: str, tool: Callable):
        """注册工具函数"""
        self.tools[name] = tool
    
    async def execute_workflow(self, workflow: Workflow) -> WorkflowResult:
        """执行完整工作流"""
        # 1. 验证依赖关系
        self._validate_dependencies(workflow)
        
        # 2. 生成执行计划(拓扑排序)
        execution_plan = self._topological_sort(workflow)
        
        # 3. 并行执行独立任务
        results = []
        for task_group in execution_plan:
            group_results = await self._execute_task_group(
                [workflow.tasks[t] for t in task_group],
                workflow.context
            )
            results.extend(group_results)
            
            # 4. 更新上下文
            workflow.context.update(self._merge_results(group_results))
        
        return WorkflowResult(
            workflow_id=workflow.id,
            status=WorkflowStatus.COMPLETED,
            results=results
        )
    
    async def _execute_task_group(
        self, 
        tasks: List[Task], 
        context: Dict[str, Any]
    ) -> List[TaskResult]:
        """并行执行一组独立任务"""
        coroutines = [
            self._execute_single_task(task, context) 
            for task in tasks
        ]
        return await asyncio.gather(*coroutines, return_exceptions=True)

3.2 Agent 实现模式

模式一:ReAct(Reasoning + Acting)

ReAct 模式让 Agent 在每一步都进行"思考 - 行动 - 观察"的循环:

Python
class ReActAgent:
    def __init__(self, llm: LLMClient, tools: List[Tool]):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.max_iterations = 10
    
    async def run(self, task: str, context: Dict) -> AgentResult:
        history = []
        
        for i in range(self.max_iterations):
            # Step 1: 思考(生成下一步计划)
            thought = await self.llm.generate(
                prompt=self._build_prompt(task, context, history)
            )
            
            # Step 2: 解析行动
            action = self._parse_action(thought)
            
            if action.type == "FINAL_ANSWER":
                return AgentResult(
                    success=True,
                    output=action.output,
                    iterations=i + 1
                )
            
            # Step 3: 执行行动
            observation = await self.tools[action.tool_name].execute(
                action.parameters
            )
            
            # Step 4: 记录历史
            history.append({
                "thought": thought,
                "action": action,
                "observation": observation
            })
        
        return AgentResult(
            success=False,
            error="Maximum iterations exceeded",
            iterations=self.max_iterations
        )

模式二:Plan-and-Execute

对于复杂任务,先规划再执行往往更高效:

Python
class PlanExecuteAgent:
    def __init__(self, planner: LLMClient, executor: LLMClient, tools: List[Tool]):
        self.planner = planner
        self.executor = executor
        self.tools = tools
    
    async def run(self, task: str, context: Dict) -> AgentResult:
        # Phase 1: 规划
        plan = await self.planner.generate(
            prompt=f"为以下任务制定详细执行计划:{task}"
        )
        steps = self._parse_plan(plan)
        
        # Phase 2: 执行
        results = []
        for step in steps:
            result = await self._execute_step(step, context, results)
            results.append(result)
            
            # 动态调整计划(可选)
            if result.requires_replan:
                steps = await self._replan(steps, result, context)
        
        return self._synthesize_results(results)

3.3 状态持久化

生产系统必须支持断点续传和状态恢复:

Python
import json
import aiofiles

class StateStore:
    def __init__(self, storage_path: str = "./state"):
        self.storage_path = storage_path
        
    async def save_checkpoint(self, workflow_id: str, state: WorkflowState):
        """保存检查点"""
        filepath = f"{self.storage_path}/{workflow_id}/checkpoint.json"
        async with aiofiles.open(filepath, 'w') as f:
            await f.write(json.dumps({
                "timestamp": datetime.now().isoformat(),
                "state": state.to_dict()
            }, indent=2))
    
    async def load_checkpoint(self, workflow_id: str) -> Optional[WorkflowState]:
        """加载最近的检查点"""
        filepath = f"{self.storage_path}/{workflow_id}/checkpoint.json"
        try:
            async with aiofiles.open(filepath, 'r') as f:
                data = json.loads(await f.read())
                return WorkflowState.from_dict(data["state"])
        except FileNotFoundError:
            return None
    
    async def save_execution_log(self, workflow_id: str, log: ExecutionLog):
        """追加执行日志"""
        filepath = f"{self.storage_path}/{workflow_id}/execution.log"
        async with aiofiles.open(filepath, 'a') as f:
            await f.write(log.to_jsonl() + "\n")

四、错误处理与恢复

4.1 重试机制

Python
from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientExecutor:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        reraise=True
    )
    async def execute_with_retry(self, task: Callable, *args, **kwargs):
        """带指数退避的重试执行"""
        return await task(*args, **kwargs)
    
    async def execute_with_fallback(
        self, 
        primary: Callable, 
        fallback: Callable,
        *args, **kwargs
    ):
        """主备切换执行"""
        try:
            return await primary(*args, **kwargs)
        except Exception as e:
            logging.warning(f"Primary failed: {e}, switching to fallback")
            return await fallback(*args, **kwargs)

4.2 超时控制

Python
async def execute_with_timeout(
    coro: Awaitable, 
    timeout_seconds: float,
    fallback: Optional[Awaitable] = None
):
    """带超时的任务执行"""
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        if fallback:
            return await fallback
        raise TaskTimeoutError(f"Task exceeded {timeout_seconds}s timeout")

五、可观测性设计

5.1 结构化日志

Python
import logging
from pythonjsonlogger import jsonlogger

def setup_structured_logging():
    logger = logging.getLogger("agent_workflow")
    handler = logging.StreamHandler()
    
    formatter = jsonlogger.JsonFormatter(
        fmt="%(asctime)s %(name)s %(levelname)s %(message)s "
            "%(workflow_id)s %(task_id)s %(duration_ms)s",
        datefmt="%Y-%m-%dT%H:%M:%S"
    )
    
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    
    return logger

5.2 指标监控

关键指标包括:

指标名称类型说明
INLINE_CODE_0Histogram工作流执行时长分布
INLINE_CODE_1Gauge任务成功率
INLINE_CODE_2CounterLLM Token 消耗量
INLINE_CODE_3Counter工具调用次数
INLINE_CODE_4Counter各类错误发生次数

六、最佳实践总结

6.1 设计原则

  1. 单一职责:每个 Agent 只负责一类任务
  2. 幂等性:任务可重复执行而不产生副作用
  3. 松耦合:Agent 之间通过消息队列或事件总线通信
  4. 可测试性:提供 Mock 工具和沙箱环境

6.2 安全考虑

  • 权限隔离:不同 Agent 使用不同的 API Key 和访问权限
  • 输入验证:严格校验用户输入和工具返回
  • 敏感信息:日志中脱敏处理密码、Token 等敏感数据
  • 速率限制:防止 LLM API 调用超出配额

6.3 性能优化

  • 批量处理:合并多个小任务为批量请求
  • 缓存策略:缓存 LLM 响应和工具执行结果
  • 流式输出:长任务支持流式返回中间结果
  • 异步并发:充分利用 asyncio 并发能力

七、结语

构建一个生产级的 AI Agent 工作流系统是一项复杂的工程,需要综合考虑架构设计、状态管理、错误处理、可观测性等多个方面。本文分享的设计模式和实现经验来源于实际项目,希望能帮助读者少走弯路。

未来,随着 Agent 技术的进一步发展,我们期待看到更多标准化的框架和工具出现,让开发者能够更专注于业务逻辑而非基础设施。

参考资料:


本文首发于 RailX Blog,转载请注明出处。

分享到:

如果这篇文章对你有帮助,欢迎请作者喝杯咖啡 ☕

加载评论中...