AI Agent 开发实战:从零构建一个自动化工作流系统
本文详细介绍如何从零开始构建一个实用的 AI Agent 自动化工作流系统。我们将涵盖架构设计、核心模块实现、任务调度机制以及生产环境部署的最佳实践。通过完整的代码示例和实战经验,帮助开发者快速掌握 AI Agent 系统的构建方法。
AI Agent 开发实战:从零构建一个自动化工作流系统
摘要
本文详细介绍如何从零开始构建一个实用的 AI Agent 自动化工作流系统。我们将涵盖架构设计、核心模块实现、任务调度机制以及生产环境部署的最佳实践。通过完整的代码示例和实战经验,帮助开发者快速掌握 AI Agent 系统的构建方法。
一、引言
随着大语言模型的快速发展,AI Agent 已经从概念验证阶段走向实际应用场景。无论是自动化客服、智能数据分析,还是复杂的工作流编排,AI Agent 都在展现出强大的潜力。
然而,很多开发者在构建 AI Agent 系统时,往往只关注模型调用这一环节,忽略了系统架构、任务调度、状态管理等关键问题。这导致系统在生产环境中难以稳定运行,也无法应对复杂的业务需求。
本文将基于我多年的 AI Agent 开发经验,分享一套经过生产环境验证的架构方案,帮助你构建一个可靠、可扩展的自动化工作流系统。
二、系统架构设计
2.1 整体架构
一个完整的 AI Agent 系统通常包含以下几个核心模块:
┌─────────────────────────────────────────────────────────┐
│ API Gateway │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Task Scheduler │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Cron Jobs │ │ Event Queue │ │ Web Hooks │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Agent Orchestrator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Tool Router │ │ Memory Mgr │ │ State Mgr │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Tool Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Web Browser │ │ File System │ │ External API│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
2.2 核心组件说明
Task Scheduler(任务调度器)
- 支持定时任务(Cron)、事件驱动、Webhook 触发
- 任务优先级管理和队列调度
- 失败重试和超时控制
Agent Orchestrator(代理编排器)
- 负责任务分解和执行路径规划
- 管理 Agent 的短期和长期记忆
- 处理多轮对话和上下文维护
Tool Router(工具路由)
- 统一的工具调用接口
- 工具权限和安全控制
- 工具执行结果标准化
三、核心模块实现
3.1 任务调度器
任务调度器是整个系统的大脑,负责任务的接收、排队和分发。以下是一个简化的实现示例:
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Callable
import asyncio
from datetime import datetime
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
name: str
payload: dict
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
class TaskScheduler:
def __init__(self):
self.task_queue: asyncio.Queue = asyncio.Queue()
self.running_tasks: dict[str, Task] = {}
self.completed_tasks: list[Task] = []
async def submit(self, task: Task):
"""提交新任务到队列"""
task.created_at = datetime.now()
await self.task_queue.put(task)
async def run_worker(self, worker_id: int, handler: Callable):
"""工作协程,从队列取任务并执行"""
while True:
task = await self.task_queue.get()
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
self.running_tasks[task.id] = task
try:
await handler(task)
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
except Exception as e:
task.error = str(e)
if task.retry_count < task.max_retries:
task.retry_count += 1
task.status = TaskStatus.PENDING
await self.task_queue.put(task)
else:
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
self.running_tasks.pop(task.id)
self.completed_tasks.append(task)
self.task_queue.task_done()
3.2 Agent 编排器
Agent 编排器负责任务的分解和执行。关键在于如何根据任务类型选择合适的工具和执行策略:
class AgentOrchestrator:
def __init__(self, tools: dict[str, Callable], memory: MemoryManager):
self.tools = tools
self.memory = memory
self.llm_client = LLMClient()
async def execute_task(self, task: Task) -> dict:
"""执行单个任务"""
# 1. 从记忆中获取相关上下文
context = self.memory.get_relevant_context(task.payload)
# 2. 让 LLM 规划执行步骤
plan = await self._generate_plan(task.payload, context)
# 3. 按步骤执行
results = []
for step in plan.steps:
tool_result = await self._execute_step(step)
results.append(tool_result)
# 4. 更新记忆
self.memory.add(step.action, tool_result)
# 5. 生成最终结果
final_result = await self._synthesize_result(task.payload, results)
return final_result
async def _generate_plan(self, payload: dict, context: str) -> Plan:
"""生成执行计划"""
prompt = f"""
任务:{payload.get('description')}
相关上下文:
{context}
可用工具:{list(self.tools.keys())}
请规划执行步骤,每个步骤包含:
- 工具名称
- 工具参数
- 预期输出
"""
response = await self.llm_client.generate(prompt)
return self._parse_plan(response)
async def _execute_step(self, step: Step) -> dict:
"""执行单个步骤"""
tool = self.tools.get(step.tool_name)
if not tool:
raise ValueError(f"Unknown tool: {step.tool_name}")
return await tool(**step.parameters)
3.3 记忆管理系统
记忆管理是 AI Agent 区别于普通脚本的关键。一个好的记忆系统应该支持:
- 短期记忆:当前会话的上下文
- 长期记忆:跨会话的知识积累
- 语义检索:根据内容相似度检索相关记忆
import chromadb
from chromadb.config import Settings
class MemoryManager:
def __init__(self, persist_dir: str = "./memory"):
self.client = chromadb.Client(Settings(
persist_directory=persist_dir,
anonymized_telemetry=False
))
self.collection = self.client.get_or_create_collection("agent_memory")
self.short_term: list[dict] = []
def add(self, action: str, result: dict, metadata: dict = None):
"""添加记忆"""
content = f"{action}: {result}"
# 添加到向量数据库(长期记忆)
self.collection.add(
documents=[content],
metadatas=[metadata or {"action": action}],
ids=[f"mem_{datetime.now().timestamp()}"]
)
# 添加到短期记忆
self.short_term.append({"action": action, "result": result})
# 保持短期记忆大小
if len(self.short_term) > 100:
self.short_term = self.short_term[-50:]
def get_relevant_context(self, query: dict, top_k: int = 5) -> str:
"""检索相关记忆"""
query_text = str(query)
# 从向量数据库检索
results = self.collection.query(
query_texts=[query_text],
n_results=top_k
)
# 结合短期记忆
context_parts = results['documents'][0] if results['documents'] else []
context_parts.extend([f"{m['action']}: {m['result']}" for m in self.short_term[-10:]])
return "\n".join(context_parts)
四、工具层实现
工具层是 Agent 与外部世界交互的桥梁。以下是几个常用工具的实现示例:
4.1 浏览器自动化工具
from playwright.async_api import async_playwright
class BrowserTool:
def __init__(self):
self.browser = None
self.context = None
self.page = None
async def initialize(self):
playwright = await async_playwright().start()
self.browser = await playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context()
self.page = await self.context.new_page()
async def navigate(self, url: str) -> dict:
"""导航到指定页面"""
await self.page.goto(url, wait_until="networkidle")
return {
"status": "success",
"url": self.page.url,
"title": await self.page.title()
}
async def fill_form(self, selector: str, value: str) -> dict:
"""填写表单"""
await self.page.fill(selector, value)
return {"status": "success", "selector": selector, "value": value}
async def click(self, selector: str) -> dict:
"""点击元素"""
await self.page.click(selector)
return {"status": "success", "selector": selector}
async def screenshot(self, path: str = None) -> dict:
"""截取屏幕"""
screenshot = await self.page.screenshot(path=path)
return {"status": "success", "path": path}
4.2 文件系统工具
import os
import json
from pathlib import Path
class FileSystemTool:
def __init__(self, workspace: str = "./workspace"):
self.workspace = Path(workspace)
async def read_file(self, path: str) -> dict:
"""读取文件内容"""
full_path = self.workspace / path
if not full_path.exists():
return {"status": "error", "error": "File not found"}
content = full_path.read_text()
return {"status": "success", "content": content, "path": str(full_path)}
async def write_file(self, path: str, content: str) -> dict:
"""写入文件"""
full_path = self.workspace / path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(content)
return {"status": "success", "path": str(full_path)}
async def list_directory(self, path: str = ".") -> dict:
"""列出目录内容"""
full_path = self.workspace / path
if not full_path.exists():
return {"status": "error", "error": "Directory not found"}
items = []
for item in full_path.iterdir():
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else 0
})
return {"status": "success", "items": items}
五、生产环境部署
5.1 容器化部署
使用 Docker 容器化部署可以确保环境一致性:
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
wget \
gnupg \
&& rm -rf /var/lib/apt/lists/*
# 安装 Playwright 浏览器
RUN playwright install chromium
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV WORKSPACE=/workspace
# 创建 workspace 目录
RUN mkdir -p /workspace
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
5.2 监控与日志
生产环境必须配备完善的监控和日志系统:
import logging
from prometheus_client import Counter, Histogram, start_http_server
# 定义指标
TASK_COUNTER = Counter('agent_tasks_total', 'Total tasks processed', ['status'])
TASK_DURATION = Histogram('agent_task_duration_seconds', 'Task duration')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('agent')
# 在任务执行时记录指标
@TASK_DURATION.time()
async def execute_with_metrics(task: Task):
try:
await execute_task(task)
TASK_COUNTER.labels(status='success').inc()
logger.info(f"Task {task.id} completed successfully")
except Exception as e:
TASK_COUNTER.labels(status='failed').inc()
logger.error(f"Task {task.id} failed: {e}", exc_info=True)
raise
5.3 安全考虑
- API 密钥管理:使用环境变量或密钥管理服务,不要硬编码
- 工具权限控制:限制 Agent 可以访问的资源范围
- 输入验证:对所有外部输入进行严格验证
- 速率限制:防止滥用和 DDoS 攻击
六、性能优化建议
6.1 并发处理
对于独立的任务,使用异步并发可以显著提升吞吐量:
async def process_batch(tasks: list[Task]) -> list[dict]:
"""批量处理任务"""
semaphore = asyncio.Semaphore(10) # 限制并发数
async def with_semaphore(task):
async with semaphore:
return await execute_task(task)
results = await asyncio.gather(
*[with_semaphore(task) for task in tasks],
return_exceptions=True
)
return results
6.2 缓存策略
对于重复的查询或计算结果,使用缓存可以减少延迟:
from functools import lru_cache
import redis
class CacheManager:
def __init__(self, redis_url: str = "redis://localhost"):
self.redis = redis.from_url(redis_url)
async def get(self, key: str) -> Optional[str]:
return await self.redis.get(key)
async def set(self, key: str, value: str, ttl: int = 3600):
await self.redis.setex(key, ttl, value)
七、总结与展望
构建一个生产级的 AI Agent 系统需要考虑多个方面:
- 架构设计:清晰的模块划分和职责分离
- 任务调度:可靠的队列管理和错误处理
- 记忆管理:有效的上下文维护和检索机制
- 工具集成:丰富的外部系统交互能力
- 部署运维:容器化、监控、日志、安全
随着技术的不断发展,AI Agent 系统也在持续演进。未来的趋势包括:
- 多 Agent 协作:多个 specialized agent 协同完成复杂任务
- 自主学习能力:从历史执行中学习优化策略
- 更好的可解释性:让 Agent 的决策过程更透明
- 更低的延迟:通过模型优化和缓存提升响应速度
希望本文的实践经验能够帮助你构建出稳定、高效的 AI Agent 系统。如果你有任何问题或建议,欢迎在评论区交流讨论。
关于作者:本文作者是一名专注于 AI Agent 和自动化系统的全栈开发者,有多年的生产环境实战经验。
参考资料:
- LangChain 官方文档
- Playwright 自动化测试指南
- ChromaDB 向量数据库文档
- Prometheus 监控最佳实践