多智能体系统搭建实战:Orchestrator-Worker 模式从设计到部署
2026 年 Q1 数据显示,45% 的新部署 AI Agent 系统采用多智能体架构。但多 Agent 不是「多个 Agent 的简单叠加」——错误的架构设计会让 Token 消耗暴增 5 倍,而错误率不降反升。本文用一个真实的「智能报表分析系统」案例,从零搭建一个可运行的多智能体系统。
为什么选择 Orchestrator-Worker 模式?
多智能体的四种协作模式中,Orchestrator-Worker 是最实用、最稳定、也最容易上手的。它的核心机制是:一个中央编排器负责任务分解和结果汇总,多个 Worker 并行执行子任务。这听起来简单,但 80% 的多 Agent 生产系统采用的就是这个模式。
选它的理由很直接:
- 并行度高:Worker 之间互不依赖,天然适合并行执行——典型场景提速 3-5 倍
- 容错性好:单个 Worker 失败不影响其他 Worker,编排器可以决定重试还是跳过
- 实现简单:不需要复杂的协调逻辑,一个编排器 + N 个 Worker 就可以工作
场景设定:智能报表分析系统
我们的案例是一个企业内部报表分析系统,用户提出「分析上季度销售数据,发现趋势和异常」,系统需要:
- 从数据库查询销售数据
- 分析数据中的趋势
- 识别异常和风险点
- 生成可视化建议
- 汇总成一份完整分析报告
我们将分别用 CrewAI 和 LangGraph 两种方案实现,最后对比选型建议。
方案 A:CrewAI 快速实现
CrewAI 上手最简单,适合快速验证和中小规模场景。Agent 的定义接近自然语言,学习成本极低。
pip install crewai
pip install crewai[tools] # 包含内置工具集
from crewai import Agent, Task, Crew, Process
# 数据分析师 Worker
analyst = Agent(
role="数据分析师",
goal="分析销售数据,识别核心趋势和模式",
backstory="你是一个资深数据分析师,擅长从数字中发现故事",
verbose=True,
allow_delegation=False,
)
# 风险分析师 Worker
risk_analyst = Agent(
role="风险分析师",
goal="识别数据中的异常和风险点",
backstory="你专注于发现隐藏的风险,对异常值极其敏感",
verbose=True,
allow_delegation=False,
)
# 可视化顾问 Worker
viz_consultant = Agent(
role="可视化顾问",
goal="根据分析结果推荐最合适的可视化方式",
backstory="你擅长用图表讲故事,知道什么数据该用什么图",
verbose=True,
allow_delegation=False,
)
# 报告撰写者 — 充当编排器角色
report_writer = Agent(
role="报告主编",
goal="整合所有分析结果,撰写一份完整的分析报告",
backstory="你是总编,负责把各专家的分析组装成一篇高质量的最终报告",
verbose=True,
allow_delegation=True, # 允许编排
)
# 并行执行的分析任务
analyze_task = Task(
description="分析{period}的销售数据:{sales_data},找出销售趋势",
agent=analyst,
expected_output="结构化的趋势分析报告,包含同比增长率、环比变化等",
)
risk_task = Task(
description="分析{period}的销售数据:{sales_data},识别异常和风险",
agent=risk_analyst,
expected_output="风险清单,每个风险包含影响程度和概率评估",
)
viz_task = Task(
description="根据分析结果推荐可视化方案",
agent=viz_consultant,
expected_output="推荐图表类型及其说明",
)
# 汇总任务(依赖前三个任务的结果)
report_task = Task(
description="整合趋势分析、风险评估和可视化建议,撰写完整报告",
agent=report_writer,
expected_output="一份完整的销售分析报告",
)
# 组装 Crew
crew = Crew(
agents=[analyst, risk_analyst, viz_consultant, report_writer],
tasks=[analyze_task, risk_task, viz_task, report_task],
process=Process.sequential, # 按顺序执行(简单模式)
verbose=True,
)
# 在实际的生产场景中,可以使用 Process.hierarchical 来让
# report_writer 自动编排其他三个 Agent 并行工作
result = crew.kickoff(inputs={
"period": "2026 年第一季度",
"sales_data": "...(从数据库获取的实际数据)",
})
print(result)
CrewAI 方案的我的评价
CrewAI 的优点是快——从想法到可运行原型不超过 30 分钟。但有两个问题需要注意:第一,sequential 模式下 Agent 是串行执行,没有发挥多 Agent 并行优势,需要切换到 hierarchical 模式;第二,CrewAI 的错误恢复机制相对基础,Agent 调用失败时不会自动重试,需要自己包装重试逻辑。
方案 B:LangGraph 精细控制
LangGraph 复杂度更高,但对执行流程有完全的精细控制。适合需要高可靠性和精细编排的生产系统。
pip install langgraph langchain langchain-openai
from typing import TypedDict, List, Any, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
# 定义系统状态(LangGraph 的核心概念)
class AnalysisState(TypedDict):
period: str
raw_data: str
trend_analysis: Optional[str]
risk_analysis: Optional[str]
viz_recommendation: Optional[str]
final_report: Optional[str]
errors: List[str]
retry_count: int
model = ChatOpenAI(model="gpt-4.5", temperature=0.2)
# 节点 1:趋势分析
def analyze_trends(state: AnalysisState):
try:
response = model.invoke([
SystemMessage(content="你是一个数据分析专家,分析销售趋势。"),
HumanMessage(content=f"分析 {state['period']} 的销售数据:{state['raw_data']}"),
])
return {"trend_analysis": response.content, "retry_count": 0}
except Exception as e:
return {"errors": [f"趋势分析失败: {str(e)}"], "retry_count": state["retry_count"] + 1}
# 节点 2:风险分析
def analyze_risks(state: AnalysisState):
response = model.invoke([
SystemMessage(content="你是一个风险分析师,识别数据中的异常。"),
HumanMessage(content=f"分析这些数据中的风险:{state['raw_data']}"),
])
return {"risk_analysis": response.content}
# 节点 3:可视化推荐
def recommend_viz(state: AnalysisState):
response = model.invoke([
SystemMessage(content="你是一个数据可视化专家。"),
HumanMessage(content=f"趋势:{state['trend_analysis']}\n风险:{state['risk_analysis']}\n推荐图表"),
])
return {"viz_recommendation": response.content}
# 节点 4:报告生成(编排器聚合)
def generate_report(state: AnalysisState):
response = model.invoke([
SystemMessage(content="你是一个报告总编。"),
HumanMessage(content=f"""整合以下分析结果为一份完整报告:
趋势:{state['trend_analysis']}
风险:{state['risk_analysis']}
可视化建议:{state['viz_recommendation']}"""),
])
return {"final_report": response.content}
# 重试决策函数
def should_retry(state: AnalysisState):
if len(state["errors"]) > 0 and state["retry_count"] < 3:
return "analyze_trends" # 重试失败的节点
return "analyze_risks" # 继续执行
# 构建图
workflow = StateGraph(AnalysisState)
# 添加节点
workflow.add_node("analyze_trends", analyze_trends)
workflow.add_node("analyze_risks", analyze_risks)
workflow.add_node("recommend_viz", recommend_viz)
workflow.add_node("generate_report", generate_report)
# 设置执行边
workflow.set_entry_point("analyze_trends")
# 条件边:趋势分析失败则重试
workflow.add_conditional_edges(
"analyze_trends",
should_retry,
{"analyze_trends": "analyze_trends", "analyze_risks": "analyze_risks"}
)
# 并行执行两条分析路径
workflow.add_edge("analyze_risks", "recommend_viz")
workflow.add_edge("recommend_viz", "generate_report")
workflow.add_edge("generate_report", END)
# 编译和运行
app = workflow.compile()
result = app.invoke({
"period": "2026 Q1",
"raw_data": "...(销售数据)",
"errors": [],
"retry_count": 0
})
print(result["final_report"])
LangGraph 方案的我的看法
LangGraph 的学习曲线确实陡一些,但它的持久化执行(Durable Execution)能力是 CrewAI 目前不具备的。在 LangGraph 中,每个节点的执行结果都会持久化,即使过程中断也能从最后一个成功节点恢复——这对生产系统至关重要。我的经验是:原型阶段用 CrewAI 快速验证,生产系统转向 LangGraph。
成本优化:三个策略
多 Agent 系统的 Token 消耗是单 Agent 的 5-10 倍,不控制成本的 MAS 部署最终会被 API 账单吓到。
| 策略 | 效果 | 实现方式 |
|---|---|---|
| 结果缓存 | Token 消耗降低 35% | 相同输入返回缓存结果。对「每日相同时间段的分析」尤其有效 |
| 模型异质化 | 推理成本降低 60% | 核心编排器用 GPT-4.5,分析 Worker 用 Gemini 3.5 Flash,辅助 Worker 用本地量化模型 |
| 压缩摘要传递 | 上下文减少 10 倍 | Worker 的完整输出不传给下一个 Worker,而是由编排器压缩为摘要后再传递 |
错误处理:不要等到出问题再想
多 Agent 系统最脆弱的时刻就是出错的时刻。单 Agent 失败可能不会导致整个系统崩溃,但错误传播会因为 Agent 之间互相调用而指数级放大。以下是经过生产验证的三层防护:
- 超时熔断:每个 Agent 节点设置最大执行时间(比如 60 秒),超时后触发熔断而不是无限等待
- 重试退避
- 降级输出:当某个 Worker 持续失败时,编排器应该能够在其缺席的情况下生成「降级版」最终输出,而不是抛出异常给用户
我见过的最常见的 MAS 生产事故是「Agent 循环」——Agent A 和 Agent B 互相修正对方的输出,陷入无限循环。在每一层 Agent 设置 max_rounds(最大交互轮次)是必须做的防御性编程。推荐上限是 5 轮,超过则强制熔断。
选型总结
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 快速原型验证(< 5 Agent) | CrewAI | 30 分钟从零到运行 |
| 生产级系统(5+ Agent,需要高可靠性) | LangGraph | 持久化执行、条件边、细粒度错误控制 |
| 需要辩论式/市场竞争模式 | AutoGen | 原生支持多 Agent 辩论和角色扮演 |
| 深度集成 LangChain 生态 | LangGraph | 与 LangChain 共享组件和工具链 |