折腾侠
技术教程

AI Agent 开发实战:从零构建一个自动化工作流系统

本文详细介绍如何从零开始构建一个实用的 AI Agent 自动化工作流系统。我们将涵盖架构设计、核心模块实现、任务调度机制以及生产环境部署的最佳实践。通过完整的代码示例和实战经验,帮助开发者快速掌握 AI Agent 系统的构建方法。

折腾侠
2026/03/16 发布
51约 9 分钟1401 字 / 1182 词00

AI Agent 开发实战:从零构建一个自动化工作流系统

摘要

本文详细介绍如何从零开始构建一个实用的 AI Agent 自动化工作流系统。我们将涵盖架构设计、核心模块实现、任务调度机制以及生产环境部署的最佳实践。通过完整的代码示例和实战经验,帮助开发者快速掌握 AI Agent 系统的构建方法。


一、引言

随着大语言模型的快速发展,AI Agent 已经从概念验证阶段走向实际应用场景。无论是自动化客服、智能数据分析,还是复杂的工作流编排,AI Agent 都在展现出强大的潜力。

然而,很多开发者在构建 AI Agent 系统时,往往只关注模型调用这一环节,忽略了系统架构、任务调度、状态管理等关键问题。这导致系统在生产环境中难以稳定运行,也无法应对复杂的业务需求。

本文将基于我多年的 AI Agent 开发经验,分享一套经过生产环境验证的架构方案,帮助你构建一个可靠、可扩展的自动化工作流系统。

二、系统架构设计

2.1 整体架构

一个完整的 AI Agent 系统通常包含以下几个核心模块:

┌─────────────────────────────────────────────────────────┐
│                    API Gateway                          │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                   Task Scheduler                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │ Cron Jobs   │  │ Event Queue │  │ Web Hooks   │     │
│  └─────────────┘  └─────────────┘  └─────────────┘     │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                   Agent Orchestrator                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │ Tool Router │  │ Memory Mgr  │  │ State Mgr   │     │
│  └─────────────┘  └─────────────┘  └─────────────┘     │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                    Tool Layer                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │ Web Browser │  │ File System │  │ External API│     │
│  └─────────────┘  └─────────────┘  └─────────────┘     │
└─────────────────────────────────────────────────────────┘

2.2 核心组件说明

Task Scheduler(任务调度器)

  • 支持定时任务(Cron)、事件驱动、Webhook 触发
  • 任务优先级管理和队列调度
  • 失败重试和超时控制

Agent Orchestrator(代理编排器)

  • 负责任务分解和执行路径规划
  • 管理 Agent 的短期和长期记忆
  • 处理多轮对话和上下文维护

Tool Router(工具路由)

  • 统一的工具调用接口
  • 工具权限和安全控制
  • 工具执行结果标准化

三、核心模块实现

3.1 任务调度器

任务调度器是整个系统的大脑,负责任务的接收、排队和分发。以下是一个简化的实现示例:

Python
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Callable
import asyncio
from datetime import datetime

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    name: str
    payload: dict
    status: TaskStatus = TaskStatus.PENDING
    created_at: datetime = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    error: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3

class TaskScheduler:
    def __init__(self):
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.running_tasks: dict[str, Task] = {}
        self.completed_tasks: list[Task] = []
        
    async def submit(self, task: Task):
        """提交新任务到队列"""
        task.created_at = datetime.now()
        await self.task_queue.put(task)
        
    async def run_worker(self, worker_id: int, handler: Callable):
        """工作协程,从队列取任务并执行"""
        while True:
            task = await self.task_queue.get()
            task.status = TaskStatus.RUNNING
            task.started_at = datetime.now()
            self.running_tasks[task.id] = task
            
            try:
                await handler(task)
                task.status = TaskStatus.COMPLETED
                task.completed_at = datetime.now()
            except Exception as e:
                task.error = str(e)
                if task.retry_count < task.max_retries:
                    task.retry_count += 1
                    task.status = TaskStatus.PENDING
                    await self.task_queue.put(task)
                else:
                    task.status = TaskStatus.FAILED
                    task.completed_at = datetime.now()
            
            self.running_tasks.pop(task.id)
            self.completed_tasks.append(task)
            self.task_queue.task_done()

3.2 Agent 编排器

Agent 编排器负责任务的分解和执行。关键在于如何根据任务类型选择合适的工具和执行策略:

Python
class AgentOrchestrator:
    def __init__(self, tools: dict[str, Callable], memory: MemoryManager):
        self.tools = tools
        self.memory = memory
        self.llm_client = LLMClient()
        
    async def execute_task(self, task: Task) -> dict:
        """执行单个任务"""
        # 1. 从记忆中获取相关上下文
        context = self.memory.get_relevant_context(task.payload)
        
        # 2. 让 LLM 规划执行步骤
        plan = await self._generate_plan(task.payload, context)
        
        # 3. 按步骤执行
        results = []
        for step in plan.steps:
            tool_result = await self._execute_step(step)
            results.append(tool_result)
            
            # 4. 更新记忆
            self.memory.add(step.action, tool_result)
        
        # 5. 生成最终结果
        final_result = await self._synthesize_result(task.payload, results)
        return final_result
    
    async def _generate_plan(self, payload: dict, context: str) -> Plan:
        """生成执行计划"""
        prompt = f"""
        任务:{payload.get('description')}
        
        相关上下文:
        {context}
        
        可用工具:{list(self.tools.keys())}
        
        请规划执行步骤,每个步骤包含:
        - 工具名称
        - 工具参数
        - 预期输出
        """
        
        response = await self.llm_client.generate(prompt)
        return self._parse_plan(response)
    
    async def _execute_step(self, step: Step) -> dict:
        """执行单个步骤"""
        tool = self.tools.get(step.tool_name)
        if not tool:
            raise ValueError(f"Unknown tool: {step.tool_name}")
        
        return await tool(**step.parameters)

3.3 记忆管理系统

记忆管理是 AI Agent 区别于普通脚本的关键。一个好的记忆系统应该支持:

  • 短期记忆:当前会话的上下文
  • 长期记忆:跨会话的知识积累
  • 语义检索:根据内容相似度检索相关记忆
Python
import chromadb
from chromadb.config import Settings

class MemoryManager:
    def __init__(self, persist_dir: str = "./memory"):
        self.client = chromadb.Client(Settings(
            persist_directory=persist_dir,
            anonymized_telemetry=False
        ))
        self.collection = self.client.get_or_create_collection("agent_memory")
        self.short_term: list[dict] = []
        
    def add(self, action: str, result: dict, metadata: dict = None):
        """添加记忆"""
        content = f"{action}: {result}"
        
        # 添加到向量数据库(长期记忆)
        self.collection.add(
            documents=[content],
            metadatas=[metadata or {"action": action}],
            ids=[f"mem_{datetime.now().timestamp()}"]
        )
        
        # 添加到短期记忆
        self.short_term.append({"action": action, "result": result})
        
        # 保持短期记忆大小
        if len(self.short_term) > 100:
            self.short_term = self.short_term[-50:]
    
    def get_relevant_context(self, query: dict, top_k: int = 5) -> str:
        """检索相关记忆"""
        query_text = str(query)
        
        # 从向量数据库检索
        results = self.collection.query(
            query_texts=[query_text],
            n_results=top_k
        )
        
        # 结合短期记忆
        context_parts = results['documents'][0] if results['documents'] else []
        context_parts.extend([f"{m['action']}: {m['result']}" for m in self.short_term[-10:]])
        
        return "\n".join(context_parts)

四、工具层实现

工具层是 Agent 与外部世界交互的桥梁。以下是几个常用工具的实现示例:

4.1 浏览器自动化工具

Python
from playwright.async_api import async_playwright

class BrowserTool:
    def __init__(self):
        self.browser = None
        self.context = None
        self.page = None
        
    async def initialize(self):
        playwright = await async_playwright().start()
        self.browser = await playwright.chromium.launch(headless=True)
        self.context = await self.browser.new_context()
        self.page = await self.context.new_page()
    
    async def navigate(self, url: str) -> dict:
        """导航到指定页面"""
        await self.page.goto(url, wait_until="networkidle")
        return {
            "status": "success",
            "url": self.page.url,
            "title": await self.page.title()
        }
    
    async def fill_form(self, selector: str, value: str) -> dict:
        """填写表单"""
        await self.page.fill(selector, value)
        return {"status": "success", "selector": selector, "value": value}
    
    async def click(self, selector: str) -> dict:
        """点击元素"""
        await self.page.click(selector)
        return {"status": "success", "selector": selector}
    
    async def screenshot(self, path: str = None) -> dict:
        """截取屏幕"""
        screenshot = await self.page.screenshot(path=path)
        return {"status": "success", "path": path}

4.2 文件系统工具

Python
import os
import json
from pathlib import Path

class FileSystemTool:
    def __init__(self, workspace: str = "./workspace"):
        self.workspace = Path(workspace)
        
    async def read_file(self, path: str) -> dict:
        """读取文件内容"""
        full_path = self.workspace / path
        if not full_path.exists():
            return {"status": "error", "error": "File not found"}
        
        content = full_path.read_text()
        return {"status": "success", "content": content, "path": str(full_path)}
    
    async def write_file(self, path: str, content: str) -> dict:
        """写入文件"""
        full_path = self.workspace / path
        full_path.parent.mkdir(parents=True, exist_ok=True)
        full_path.write_text(content)
        return {"status": "success", "path": str(full_path)}
    
    async def list_directory(self, path: str = ".") -> dict:
        """列出目录内容"""
        full_path = self.workspace / path
        if not full_path.exists():
            return {"status": "error", "error": "Directory not found"}
        
        items = []
        for item in full_path.iterdir():
            items.append({
                "name": item.name,
                "type": "directory" if item.is_dir() else "file",
                "size": item.stat().st_size if item.is_file() else 0
            })
        
        return {"status": "success", "items": items}

五、生产环境部署

5.1 容器化部署

使用 Docker 容器化部署可以确保环境一致性:

Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    wget \
    gnupg \
    && rm -rf /var/lib/apt/lists/*

# 安装 Playwright 浏览器
RUN playwright install chromium

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV WORKSPACE=/workspace

# 创建 workspace 目录
RUN mkdir -p /workspace

CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

5.2 监控与日志

生产环境必须配备完善的监控和日志系统:

Python
import logging
from prometheus_client import Counter, Histogram, start_http_server

# 定义指标
TASK_COUNTER = Counter('agent_tasks_total', 'Total tasks processed', ['status'])
TASK_DURATION = Histogram('agent_task_duration_seconds', 'Task duration')

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('agent.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('agent')

# 在任务执行时记录指标
@TASK_DURATION.time()
async def execute_with_metrics(task: Task):
    try:
        await execute_task(task)
        TASK_COUNTER.labels(status='success').inc()
        logger.info(f"Task {task.id} completed successfully")
    except Exception as e:
        TASK_COUNTER.labels(status='failed').inc()
        logger.error(f"Task {task.id} failed: {e}", exc_info=True)
        raise

5.3 安全考虑

  • API 密钥管理:使用环境变量或密钥管理服务,不要硬编码
  • 工具权限控制:限制 Agent 可以访问的资源范围
  • 输入验证:对所有外部输入进行严格验证
  • 速率限制:防止滥用和 DDoS 攻击

六、性能优化建议

6.1 并发处理

对于独立的任务,使用异步并发可以显著提升吞吐量:

Python
async def process_batch(tasks: list[Task]) -> list[dict]:
    """批量处理任务"""
    semaphore = asyncio.Semaphore(10)  # 限制并发数
    
    async def with_semaphore(task):
        async with semaphore:
            return await execute_task(task)
    
    results = await asyncio.gather(
        *[with_semaphore(task) for task in tasks],
        return_exceptions=True
    )
    
    return results

6.2 缓存策略

对于重复的查询或计算结果,使用缓存可以减少延迟:

Python
from functools import lru_cache
import redis

class CacheManager:
    def __init__(self, redis_url: str = "redis://localhost"):
        self.redis = redis.from_url(redis_url)
        
    async def get(self, key: str) -> Optional[str]:
        return await self.redis.get(key)
    
    async def set(self, key: str, value: str, ttl: int = 3600):
        await self.redis.setex(key, ttl, value)

七、总结与展望

构建一个生产级的 AI Agent 系统需要考虑多个方面:

  1. 架构设计:清晰的模块划分和职责分离
  2. 任务调度:可靠的队列管理和错误处理
  3. 记忆管理:有效的上下文维护和检索机制
  4. 工具集成:丰富的外部系统交互能力
  5. 部署运维:容器化、监控、日志、安全

随着技术的不断发展,AI Agent 系统也在持续演进。未来的趋势包括:

  • 多 Agent 协作:多个 specialized agent 协同完成复杂任务
  • 自主学习能力:从历史执行中学习优化策略
  • 更好的可解释性:让 Agent 的决策过程更透明
  • 更低的延迟:通过模型优化和缓存提升响应速度

希望本文的实践经验能够帮助你构建出稳定、高效的 AI Agent 系统。如果你有任何问题或建议,欢迎在评论区交流讨论。


关于作者:本文作者是一名专注于 AI Agent 和自动化系统的全栈开发者,有多年的生产环境实战经验。

参考资料

  • LangChain 官方文档
  • Playwright 自动化测试指南
  • ChromaDB 向量数据库文档
  • Prometheus 监控最佳实践
分享到:

如果这篇文章对你有帮助,欢迎请作者喝杯咖啡 ☕

加载评论中...