教程中心多智能体系统搭建实战
进阶

多智能体系统搭建实战:Orchestrator-Worker 模式从设计到部署

2026.06.01· 8 个步骤 · 30 分钟阅读· 🧠 多智能体

2026 年 Q1 数据显示,45% 的新部署 AI Agent 系统采用多智能体架构。但多 Agent 不是「多个 Agent 的简单叠加」——错误的架构设计会让 Token 消耗暴增 5 倍,而错误率不降反升。本文用一个真实的「智能报表分析系统」案例,从零搭建一个可运行的多智能体系统。

为什么选择 Orchestrator-Worker 模式?

多智能体的四种协作模式中,Orchestrator-Worker 是最实用、最稳定、也最容易上手的。它的核心机制是:一个中央编排器负责任务分解和结果汇总,多个 Worker 并行执行子任务。这听起来简单,但 80% 的多 Agent 生产系统采用的就是这个模式。

选它的理由很直接:

  • 并行度高:Worker 之间互不依赖,天然适合并行执行——典型场景提速 3-5 倍
  • 容错性好:单个 Worker 失败不影响其他 Worker,编排器可以决定重试还是跳过
  • 实现简单:不需要复杂的协调逻辑,一个编排器 + N 个 Worker 就可以工作

场景设定:智能报表分析系统

我们的案例是一个企业内部报表分析系统,用户提出「分析上季度销售数据,发现趋势和异常」,系统需要:

  1. 从数据库查询销售数据
  2. 分析数据中的趋势
  3. 识别异常和风险点
  4. 生成可视化建议
  5. 汇总成一份完整分析报告

我们将分别用 CrewAI 和 LangGraph 两种方案实现,最后对比选型建议。

方案 A:CrewAI 快速实现

CrewAI 上手最简单,适合快速验证和中小规模场景。Agent 的定义接近自然语言,学习成本极低。

1 安装 CrewAI 并配置
pip install crewai
pip install crewai[tools]  # 包含内置工具集
2 定义 Agent 角色
from crewai import Agent, Task, Crew, Process

# 数据分析师 Worker
analyst = Agent(
    role="数据分析师",
    goal="分析销售数据,识别核心趋势和模式",
    backstory="你是一个资深数据分析师,擅长从数字中发现故事",
    verbose=True,
    allow_delegation=False,
)

# 风险分析师 Worker
risk_analyst = Agent(
    role="风险分析师",
    goal="识别数据中的异常和风险点",
    backstory="你专注于发现隐藏的风险,对异常值极其敏感",
    verbose=True,
    allow_delegation=False,
)

# 可视化顾问 Worker
viz_consultant = Agent(
    role="可视化顾问",
    goal="根据分析结果推荐最合适的可视化方式",
    backstory="你擅长用图表讲故事,知道什么数据该用什么图",
    verbose=True,
    allow_delegation=False,
)

# 报告撰写者 — 充当编排器角色
report_writer = Agent(
    role="报告主编",
    goal="整合所有分析结果,撰写一份完整的分析报告",
    backstory="你是总编,负责把各专家的分析组装成一篇高质量的最终报告",
    verbose=True,
    allow_delegation=True,  # 允许编排
)
3 定义任务和流程
# 并行执行的分析任务
analyze_task = Task(
    description="分析{period}的销售数据:{sales_data},找出销售趋势",
    agent=analyst,
    expected_output="结构化的趋势分析报告,包含同比增长率、环比变化等",
)

risk_task = Task(
    description="分析{period}的销售数据:{sales_data},识别异常和风险",
    agent=risk_analyst,
    expected_output="风险清单,每个风险包含影响程度和概率评估",
)

viz_task = Task(
    description="根据分析结果推荐可视化方案",
    agent=viz_consultant,
    expected_output="推荐图表类型及其说明",
)

# 汇总任务(依赖前三个任务的结果)
report_task = Task(
    description="整合趋势分析、风险评估和可视化建议,撰写完整报告",
    agent=report_writer,
    expected_output="一份完整的销售分析报告",
)

# 组装 Crew
crew = Crew(
    agents=[analyst, risk_analyst, viz_consultant, report_writer],
    tasks=[analyze_task, risk_task, viz_task, report_task],
    process=Process.sequential,  # 按顺序执行(简单模式)
    verbose=True,
)

# 在实际的生产场景中,可以使用 Process.hierarchical 来让
# report_writer 自动编排其他三个 Agent 并行工作
4 运行
result = crew.kickoff(inputs={
    "period": "2026 年第一季度",
    "sales_data": "...(从数据库获取的实际数据)",
})
print(result)

CrewAI 方案的我的评价

CrewAI 的优点是快——从想法到可运行原型不超过 30 分钟。但有两个问题需要注意:第一,sequential 模式下 Agent 是串行执行,没有发挥多 Agent 并行优势,需要切换到 hierarchical 模式;第二,CrewAI 的错误恢复机制相对基础,Agent 调用失败时不会自动重试,需要自己包装重试逻辑。

方案 B:LangGraph 精细控制

LangGraph 复杂度更高,但对执行流程有完全的精细控制。适合需要高可靠性和精细编排的生产系统。

5 安装并定义状态图
pip install langgraph langchain langchain-openai
from typing import TypedDict, List, Any, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

# 定义系统状态(LangGraph 的核心概念)
class AnalysisState(TypedDict):
    period: str
    raw_data: str
    trend_analysis: Optional[str]
    risk_analysis: Optional[str]
    viz_recommendation: Optional[str]
    final_report: Optional[str]
    errors: List[str]
    retry_count: int
6 定义节点(Worker)和执行边
model = ChatOpenAI(model="gpt-4.5", temperature=0.2)

# 节点 1:趋势分析
def analyze_trends(state: AnalysisState):
    try:
        response = model.invoke([
            SystemMessage(content="你是一个数据分析专家,分析销售趋势。"),
            HumanMessage(content=f"分析 {state['period']} 的销售数据:{state['raw_data']}"),
        ])
        return {"trend_analysis": response.content, "retry_count": 0}
    except Exception as e:
        return {"errors": [f"趋势分析失败: {str(e)}"], "retry_count": state["retry_count"] + 1}

# 节点 2:风险分析
def analyze_risks(state: AnalysisState):
    response = model.invoke([
        SystemMessage(content="你是一个风险分析师,识别数据中的异常。"),
        HumanMessage(content=f"分析这些数据中的风险:{state['raw_data']}"),
    ])
    return {"risk_analysis": response.content}

# 节点 3:可视化推荐
def recommend_viz(state: AnalysisState):
    response = model.invoke([
        SystemMessage(content="你是一个数据可视化专家。"),
        HumanMessage(content=f"趋势:{state['trend_analysis']}\n风险:{state['risk_analysis']}\n推荐图表"),
    ])
    return {"viz_recommendation": response.content}

# 节点 4:报告生成(编排器聚合)
def generate_report(state: AnalysisState):
    response = model.invoke([
        SystemMessage(content="你是一个报告总编。"),
        HumanMessage(content=f"""整合以下分析结果为一份完整报告:
趋势:{state['trend_analysis']}
风险:{state['risk_analysis']}
可视化建议:{state['viz_recommendation']}"""),
    ])
    return {"final_report": response.content}

# 重试决策函数
def should_retry(state: AnalysisState):
    if len(state["errors"]) > 0 and state["retry_count"] < 3:
        return "analyze_trends"  # 重试失败的节点
    return "analyze_risks"  # 继续执行
7 构建执行图
# 构建图
workflow = StateGraph(AnalysisState)

# 添加节点
workflow.add_node("analyze_trends", analyze_trends)
workflow.add_node("analyze_risks", analyze_risks)
workflow.add_node("recommend_viz", recommend_viz)
workflow.add_node("generate_report", generate_report)

# 设置执行边
workflow.set_entry_point("analyze_trends")

# 条件边:趋势分析失败则重试
workflow.add_conditional_edges(
    "analyze_trends",
    should_retry,
    {"analyze_trends": "analyze_trends", "analyze_risks": "analyze_risks"}
)

# 并行执行两条分析路径
workflow.add_edge("analyze_risks", "recommend_viz")
workflow.add_edge("recommend_viz", "generate_report")
workflow.add_edge("generate_report", END)

# 编译和运行
app = workflow.compile()
result = app.invoke({
    "period": "2026 Q1",
    "raw_data": "...(销售数据)",
    "errors": [],
    "retry_count": 0
})
print(result["final_report"])

LangGraph 方案的我的看法

LangGraph 的学习曲线确实陡一些,但它的持久化执行(Durable Execution)能力是 CrewAI 目前不具备的。在 LangGraph 中,每个节点的执行结果都会持久化,即使过程中断也能从最后一个成功节点恢复——这对生产系统至关重要。我的经验是:原型阶段用 CrewAI 快速验证,生产系统转向 LangGraph。

成本优化:三个策略

多 Agent 系统的 Token 消耗是单 Agent 的 5-10 倍,不控制成本的 MAS 部署最终会被 API 账单吓到。

策略效果实现方式
结果缓存 Token 消耗降低 35% 相同输入返回缓存结果。对「每日相同时间段的分析」尤其有效
模型异质化 推理成本降低 60% 核心编排器用 GPT-4.5,分析 Worker 用 Gemini 3.5 Flash,辅助 Worker 用本地量化模型
压缩摘要传递 上下文减少 10 倍 Worker 的完整输出不传给下一个 Worker,而是由编排器压缩为摘要后再传递

错误处理:不要等到出问题再想

多 Agent 系统最脆弱的时刻就是出错的时刻。单 Agent 失败可能不会导致整个系统崩溃,但错误传播会因为 Agent 之间互相调用而指数级放大。以下是经过生产验证的三层防护:

  • 超时熔断:每个 Agent 节点设置最大执行时间(比如 60 秒),超时后触发熔断而不是无限等待
  • 重试退避
  • 降级输出:当某个 Worker 持续失败时,编排器应该能够在其缺席的情况下生成「降级版」最终输出,而不是抛出异常给用户

我见过的最常见的 MAS 生产事故是「Agent 循环」——Agent A 和 Agent B 互相修正对方的输出,陷入无限循环。在每一层 Agent 设置 max_rounds(最大交互轮次)是必须做的防御性编程。推荐上限是 5 轮,超过则强制熔断。

选型总结

场景推荐方案理由
快速原型验证(< 5 Agent)CrewAI30 分钟从零到运行
生产级系统(5+ Agent,需要高可靠性)LangGraph持久化执行、条件边、细粒度错误控制
需要辩论式/市场竞争模式AutoGen原生支持多 Agent 辩论和角色扮演
深度集成 LangChain 生态LangGraph与 LangChain 共享组件和工具链