智能体设计模式

第3章:并行化

同时执行多个子任务以提高AI智能体工作流的效率和性能

第3章:并行化

并行化模式概述

在前面的章节中,我们探索了用于顺序工作流的提示链和用于动态决策和不同路径之间转换的路由。虽然这些模式是必不可少的,但许多复杂的智能体任务涉及可以同时执行而不是一个接一个执行的多个子任务。这就是并行化模式变得至关重要的地方。

并行化涉及同时执行多个组件,如LLM调用、工具使用,甚至整个子智能体(见图1)。不是等待一个步骤完成再开始下一个,并行执行允许独立任务同时运行,显著减少可以分解为独立部分的任务的总执行时间。

考虑一个设计用于研究主题并总结其发现的智能体。顺序方法可能:

  1. 搜索来源A。
  2. 总结来源A。
  3. 搜索来源B。
  4. 总结来源B。
  5. 从摘要A和B综合最终答案。

并行方法可以改为:

  1. 同时搜索来源A搜索来源B。
  2. 一旦两个搜索完成,同时总结来源A总结来源B。
  3. 从摘要A和B综合最终答案(此步骤通常是顺序的,等待并行步骤完成)。

核心思想是识别工作流中不依赖于其他部分输出的部分,并并行执行它们。这在处理具有延迟的外部服务(如API或数据库)时特别有效,因为您可以同时发出多个请求。

实现并行化通常需要支持异步执行或多线程/多处理的框架。现代智能体框架在设计时考虑了异步操作,允许您轻松定义可以并行运行的步骤。

image1

图1. 子智能体并行化示例

LangChain、LangGraph和Google ADK等框架提供并行执行机制。在LangChain Expression Language(LCEL)中,您可以通过使用|(用于顺序)等运算符组合可运行对象,并通过构建链或图以具有可以并发执行的分支来实现并行执行。LangGraph凭借其图结构,允许您定义可以从单个状态转换执行的多个节点,有效地在工作流中启用并行分支。Google ADK提供健壮的原生机制来促进和管理智能体的并行执行,显著增强复杂多智能体系统的效率和可扩展性。ADK框架内的这种固有能力允许开发者设计和实现多个智能体可以并发而不是顺序操作的解决方案。

并行化模式对于提高智能体系统的效率和响应性至关重要,特别是在处理涉及多个独立查找、计算或与外部服务交互的任务时。它是优化复杂智能体工作流性能的关键技术。

实际应用和用例

并行化是跨各种应用程序优化智能体性能的强大模式:

  1. 信息收集和研究:
    同时从多个来源收集信息是经典用例。
  • 用例: 研究公司的智能体。
    • 并行任务: 同时搜索新闻文章、拉取股票数据、检查社交媒体提及并查询公司数据库。
    • 好处: 比顺序查找更快地收集全面视图。
  1. 数据处理和分析:
    同时应用不同的分析技术或处理不同的数据段。
  • 用例: 分析客户反馈的智能体。
    • 并行任务: 在一批反馈条目中同时运行情感分析、提取关键词、分类反馈并识别紧急问题。
    • 好处: 快速提供多方面的分析。
  1. 多API或工具交互:
    调用多个独立的API或工具以收集不同类型的信息或执行不同的行动。
  • 用例: 旅行规划智能体。
    • 并行任务: 同时检查航班价格、搜索酒店可用性、查找本地活动并找到餐厅推荐。
    • 好处: 更快地呈现完整的旅行计划。
  1. 多组件内容生成:
    并行生成复杂内容的不同部分。
  • 用例: 创建营销电子邮件的智能体。
    • 并行任务: 同时生成主题行、起草电子邮件正文、查找相关图像并创建行动号召按钮文本。
    • 好处: 更高效地组装最终电子邮件。
  1. 验证和核实:
    同时执行多个独立的检查或验证。
  • 用例: 验证用户输入的智能体。
    • 并行任务: 同时检查电子邮件格式、验证电话号码、根据数据库验证地址并检查亵渎内容。
    • 好处: 提供更快的输入有效性反馈。
  1. 多模态处理:
    同时处理同一输入的不同模态(文本、图像、音频)。
  • 用例: 分析包含文本和图像的社交媒体帖子的智能体。
    • 并行任务: 同时分析文本的情感和关键词分析图像的对象和场景描述。
    • 好处: 更快地整合来自不同模态的洞察。
  1. A/B测试或多选项生成:
    并行生成响应的多个变体或输出以选择最佳选项。
  • 用例: 生成不同创意文本选项的智能体。
    • 并行任务: 使用略有不同的提示或模型同时生成文章的三个不同标题。
    • 好处: 允许快速比较和选择最佳选项。

并行化是智能体设计中的基本优化技术,允许开发者通过利用独立任务的并发执行构建更高性能和响应性的应用程序。

实践代码示例(LangChain)

LangChain框架内的并行执行由LangChain Expression Language(LCEL)促进。主要方法涉及在字典或列表构造内构建多个可运行组件。当此集合作为输入传递给链中的后续组件时,LCEL运行时并发执行包含的可运行对象。

在LangGraph的上下文中,此原理应用于图的拓扑。并行工作流通过构建图来定义,使得多个节点,缺乏直接顺序依赖,可以从单个公共节点启动。这些并行路径独立执行,然后它们的结果可以在图中的后续汇聚点聚合。

以下实现演示了使用LangChain框架构建的并行处理工作流。此工作流设计为响应单个用户查询并发执行两个独立操作。这些并行过程被实例化为不同的链或函数,它们各自的输出随后聚合为统一结果。

此实现的先决条件包括安装必要的Python包,如langchain、langchain-community和模型特定的提供者库如langchain-openai。此外,必须在本地环境中配置所选语言模型服务的适当身份验证,通常通过API密钥。

import os
import asyncio
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough

# --- 配置 ---
# 确保设置了API密钥环境变量(如OPENAI_API_KEY)
try:
    llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
except Exception as e:
    print(f"初始化语言模型时出错: {e}")
    llm = None

# --- 定义独立链 ---
# 这三个链代表可以并行执行的不同任务。
summarize_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "简洁地总结以下主题:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

questions_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "生成关于以下主题的三个有趣问题:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

terms_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "从以下主题中识别5-10个关键术语,用逗号分隔:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

# --- 构建并行+综合链 ---
# 1. 定义要并行运行的任务块。这些的结果,
#    连同原始主题,将被输入到下一步。
map_chain = RunnableParallel(
    {
        "summary": summarize_chain,
        "questions": questions_chain,
        "key_terms": terms_chain,
        "topic": RunnablePassthrough(),  # 将原始主题传递
    }
)

# 2. 定义将组合并行结果的最终综合提示。
synthesis_prompt = ChatPromptTemplate.from_messages([
    ("system", """基于以下信息:
    摘要: {summary}
    相关问题: {questions}
    关键术语: {key_terms}
    综合一个全面的答案。"""),
    ("user", "原始主题: {topic}")
])

# 3. 通过将并行结果直接管道到综合提示,
#    然后是LLM和输出解析器来构建完整链。
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()

# --- 运行链 ---
async def run_parallel_example(topic: str) -> None:
    """
    异步调用具有特定主题的并行处理链
    并打印综合结果。
    参数:
        topic: 由LangChain链处理的输入主题。
    """
    if not llm:
        print("LLM未初始化。无法运行示例。")
        return
    
    print(f"\n--- 为主题运行并行LangChain示例: '{topic}' ---")
    try:
        # 对`ainvoke`的输入是单个'topic'字符串,
        # 然后传递给`map_chain`中的每个可运行对象。
        response = await full_parallel_chain.ainvoke(topic)
        print("\n--- 最终响应 ---")
        print(response)
    except Exception as e:
        print(f"\n链执行期间发生错误: {e}")

if __name__ == "__main__":
    test_topic = "太空探索的历史"
    # 在Python 3.7+中,asyncio.run是运行异步函数的标准方式。
    asyncio.run(run_parallel_example(test_topic))

提供的Python代码实现了LangChain应用程序,设计用于通过利用并行执行高效处理给定主题。注意asyncio提供并发性,而不是并行性。它通过使用事件循环在单个线程上实现这一点,该事件循环在任务空闲时(例如,等待网络请求)智能地在任务之间切换。这创建了多个任务同时进行的效果,但代码本身仍然只由一个线程执行,受Python的全局解释器锁(GIL)约束。

代码首先从langchain_openai和langchain_core导入基本模块,包括语言模型、提示、输出解析和可运行结构的组件。代码尝试初始化ChatOpenAI实例,特别是使用"gpt-4o-mini"模型,具有控制创造性的指定温度。在语言模型初始化期间使用try-except块以确保健壮性。然后定义三个独立的LangChain"链",每个都设计为对输入主题执行不同的任务。第一个链用于简洁地总结主题,使用系统消息和包含主题占位符的用户消息。第二个链配置为生成与主题相关的三个有趣问题。第三个链设置为从输入主题中识别5到10个关键术语,要求它们用逗号分隔。这些独立链中的每一个都由针对其特定任务定制的ChatPromptTemplate组成,然后是初始化的语言模型和StrOutputParser以将输出格式化为字符串。

然后构建RunnableParallel块以捆绑这三个链,允许它们同时执行。此并行可运行对象还包括RunnablePassthrough以确保原始输入主题可用于后续步骤。为最终综合步骤定义单独的ChatPromptTemplate,将摘要、问题、关键术语和原始主题作为输入以生成全面答案。完整端到端处理链,命名为full_parallel_chain,通过将map_chain(并行块)序列化到综合提示,然后是语言模型和输出解析器来创建。提供异步函数run_parallel_example来演示如何调用此full_parallel_chain。此函数将主题作为输入并使用invoke运行异步链。最后,标准Python if name == "main":块显示如何使用示例主题执行run_parallel_example,在这种情况下是"太空探索的历史",使用asyncio.run管理异步执行。

本质上,此代码设置了一个工作流,其中多个LLM调用(用于总结、问题和术语)同时发生给定主题,然后它们的结果由最终的LLM调用组合。这展示了使用LangChain的智能体工作流中并行化的核心思想。

实践代码示例(Google ADK)

好的,现在让我们将注意力转向在Google ADK框架内说明这些概念的具体示例。我们将研究如何应用ADK原语,如ParallelAgent和SequentialAgent,来构建利用并发执行提高效率的智能体流。

from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
from google.adk.tools import google_search

GEMINI_MODEL="gemini-2.0-flash"

# --- 1. 定义研究员子智能体(并行运行) ---
# 研究员1:可再生能源
researcher_agent_1 = LlmAgent(
    name="RenewableEnergyResearcher",
    model=GEMINI_MODEL,
    instruction="""你是专门从事能源的AI研究助手。研究'可再生能源'的最新进展。使用提供的Google搜索工具。简洁地总结你的关键发现(1-2句话)。仅输出摘要。 """,
    description="研究可再生能源。",
    tools=[google_search],
    # 为合并智能体在状态中存储结果
    output_key="renewable_energy_result"
)

# 研究员2:电动汽车
researcher_agent_2 = LlmAgent(
    name="EVResearcher",
    model=GEMINI_MODEL,
    instruction="""你是专门从事交通的AI研究助手。研究'电动汽车技术'的最新发展。使用提供的Google搜索工具。简洁地总结你的关键发现(1-2句话)。仅输出摘要。 """,
    description="研究电动汽车技术。",
    tools=[google_search],
    # 为合并智能体在状态中存储结果
    output_key="ev_technology_result"
)

# 研究员3:碳捕获
researcher_agent_3 = LlmAgent(
    name="CarbonCaptureResearcher",
    model=GEMINI_MODEL,
    instruction="""你是专门从事气候解决方案的AI研究助手。研究'碳捕获方法'的当前状态。使用提供的Google搜索工具。简洁地总结你的关键发现(1-2句话)。仅输出摘要。 """,
    description="研究碳捕获方法。",
    tools=[google_search],
    # 为合并智能体在状态中存储结果
    output_key="carbon_capture_result"
)

# --- 2. 创建ParallelAgent(并发运行研究员) ---
# 此智能体编排研究员的并发执行。
# 一旦所有研究员完成并将其结果存储在状态中,它就完成。
parallel_research_agent = ParallelAgent(
    name="ParallelWebResearchAgent",
    sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],
    description="并行运行多个研究智能体以收集信息。"
)

# --- 3. 定义合并智能体(在并行智能体*之后*运行) ---
# 此智能体获取并行智能体在会话状态中存储的结果
# 并将它们综合成带有归因的单一、结构化响应。
merger_agent = LlmAgent(
    name="SynthesisAgent",
    model=GEMINI_MODEL,  # 或者如果需要综合,可能使用更强大的模型
    instruction="""你是负责将研究发现组合成结构化报告的AI助手。你的主要任务是将以下研究摘要综合,清楚地归因发现到其来源领域。使用每个主题的标题构建你的响应。确保报告连贯并平滑整合关键点。

**关键:你的整个响应必须*完全*基于下面'输入摘要'中提供的信息。不要添加任何外部知识、事实或这些特定摘要中不存在的细节。**

**输入摘要:**
*   **可再生能源:**
    {renewable_energy_result}
*   **电动汽车:**
    {ev_technology_result}
*   **碳捕获:**
    {carbon_capture_result}

**输出格式:**
## 近期可持续技术进展摘要

### 可再生能源发现(基于RenewableEnergyResearcher的发现)
[仅基于上面提供的可再生能源输入摘要进行综合和阐述。]

### 电动汽车发现(基于EVResearcher的发现)
[仅基于上面提供的EV输入摘要进行综合和阐述。]

### 碳捕获发现(基于CarbonCaptureResearcher的发现)
[仅基于上面提供的碳捕获输入摘要进行综合和阐述。]

### 总体结论
[提供仅连接上述发现的简要(1-2句话)结论陈述。]

仅输出遵循此格式的结构化报告。不要在此结构之外包含介绍性或结论性短语,并严格坚持仅使用提供的输入摘要内容。 """,
    description="将并行智能体的研究发现组合成严格基于提供输入的结构化、引用报告。",
    # 合并不需要工具
    # 这里不需要output_key,因为其直接响应是序列的最终输出
)

# --- 4. 创建SequentialAgent(编排整体流程) ---
# 这是将运行的主智能体。它首先执行ParallelAgent
# 以填充状态,然后执行MergerAgent以产生最终输出。
sequential_pipeline_agent = SequentialAgent(
    name="ResearchAndSynthesisPipeline",
    # 先运行并行研究,然后合并
    sub_agents=[parallel_research_agent, merger_agent],
    description="协调并行研究并综合结果。"
)

root_agent = sequential_pipeline_agent

此代码定义了用于研究和综合可持续技术进展信息的多智能体系统。它设置了三个LlmAgent实例作为专门的研究员。ResearcherAgent_1专注于可再生能源,ResearcherAgent_2研究电动汽车技术,ResearcherAgent_3调查碳捕获方法。每个研究员智能体都配置为使用GEMINI_MODEL和google_search工具。它们被指示简洁地总结其发现(1-2句话)并使用output_key将这些摘要存储在会话状态中。

然后创建名为ParallelWebResearchAgent的ParallelAgent以并发运行这三个研究员智能体。这允许研究并行进行,可能节省时间。ParallelAgent一旦其所有子智能体(研究员)完成并填充状态就完成其执行。

接下来,定义MergerAgent(也是LlmAgent)来综合研究结果。此智能体将并行研究员在会话状态中存储的摘要作为输入。其指令强调输出必须严格基于仅提供的输入摘要,禁止添加外部知识。MergerAgent设计为将组合发现构建成每个主题都有标题和简要总体结论的报告。

最后,创建名为ResearchAndSynthesisPipeline的SequentialAgent来编排整个工作流。作为主控制器,此主智能体首先执行ParallelAgent进行研究。一旦ParallelAgent完成,SequentialAgent然后执行MergerAgent来综合收集的信息。sequential_pipeline_agent被设置为root_agent,代表运行此多智能体系统的入口点。整体过程设计为高效地从多个来源并行收集信息,然后将其组合成单一、结构化的报告。

一览

什么: 许多智能体工作流涉及必须完成以实现最终目标的多个子任务。纯粹的顺序执行,其中每个任务等待前一个完成,通常是低效和缓慢的。当任务依赖于外部I/O操作(如调用不同API或查询多个数据库)时,这种延迟成为重大瓶颈。没有并发执行机制,总处理时间是所有单独任务持续时间的总和,阻碍系统的整体性能和响应性。

为什么: 并行化模式通过启用独立任务的同时执行提供标准化解决方案。它通过识别工作流的组件(如工具使用或LLM调用)来实现,这些组件不依赖于彼此的直接输出。LangChain和Google ADK等智能体框架提供内置构造来定义和管理这些并发操作。例如,主进程可以调用几个并行运行的子任务,并等待它们全部完成再继续下一步。通过同时而不是一个接一个地运行这些独立任务,此模式大大减少了总执行时间。

经验法则: 当工作流包含可以同时运行的多个独立操作时使用此模式,如从多个API获取数据、处理不同的数据块或为后续综合生成多个内容片段。

视觉总结

image2

图2:并行化设计模式

关键要点

以下是关键要点:

  • 并行化是并发执行独立任务以提高效率的模式。
  • 当任务涉及等待外部资源(如API调用)时特别有用。
  • 采用并发或并行架构引入了大量复杂性和成本,影响设计、调试和系统日志记录等关键开发阶段。
  • LangChain和Google ADK等框架提供定义和管理并行执行的内置支持。
  • 在LangChain Expression Language(LCEL)中,RunnableParallel是并行运行多个可运行对象的关键构造。
  • Google ADK可以通过LLM驱动的委托促进并行执行,其中协调器智能体的LLM识别独立的子任务并触发专门子智能体的并发处理。
  • 并行化有助于减少整体延迟并使智能体系统对复杂任务更具响应性。

结论

并行化模式是通过并发执行独立子任务来优化计算工作流的方法。这种方法减少了整体延迟,特别是在涉及多个模型推理或外部服务调用的复杂操作中。

框架为此模式提供不同的实现机制。在LangChain中,像RunnableParallel这样的构造用于显式定义和同时执行多个处理链。相比之下,像Google Agent Developer Kit(ADK)这样的框架可以通过多智能体委托实现并行化,其中主协调器模型将不同的子任务分配给可以并发操作的专门智能体。

通过将并行处理与顺序(链接)和条件(路由)控制流集成,可以构建能够高效管理多样化和复杂任务的复杂、高性能计算系统。

参考文献

以下是关于并行化模式和相关概念的进一步阅读资源:

  1. LangChain Expression Language(LCEL)文档(并行性):https://python.langchain.com/docs/concepts/lcel/
  2. Google Agent Developer Kit(ADK)文档(多智能体系统):https://google.github.io/adk-docs/agents/multi-agents/
  3. Python asyncio文档:https://docs.python.org/3/library/asyncio.html