跳转到主要内容

热门内容

今日:


总体:


最近浏览:


Chinese, Simplified

category

LangGraph:通过带状态、多参与者的LLM编排复杂业务流程​

LangChain是构建大型语言模型(LLM)应用程序的框架,LangGraph则是基于LangChain的库,支持​​循环工作流​​和​​智能体创建​​。

LangGraph简化了构建带"记忆"的AI应用。想象对话能记住历史问答——LangGraph让LLM实现此功能!它借鉴数据处理工具思路,通过简单函数(如Python代码)循环连接应用组件。这种"记忆"能力支持更复杂的LLM交互。还能高效自动化长时业务流(LRBP),具备长暂停、可恢复流程及多智能体协同等特点。

本文聚焦LangGraph如何实现​​状态持久化​​和​​共享状态​​。我们通过连续问答演示:每个问题答案依赖前序回答。以下是顺序问答:

问题1:谁赢得2022年FIFA世界杯?

答案:阿根廷决赛击败法国夺冠。

问题2:另一决赛方的首都是?

答案:法国首都是巴黎。

问题3:该队上次击败谁夺冠?该国人口多少?

答案:法国2018年击败克罗地亚夺冠,克罗地亚人口约400万。

补充:法国人口约6700万。

​实现代码​

安装依赖:

pip install -U langgraph
pip install langchain_openai

设置环境变量:

export OPENAI_API_KEY=sk-...

核心代码:

导入库

# 导入LangGraph所需的库和消息类型
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.sqlite import SqliteSaver
# 配置Tavily搜索工具,最多返回2个结果
tool = TavilySearchResults(max_results=2)
# 定义智能体状态类型,包含一个累积的消息列表
class AgentState(TypedDict):
   messages: Annotated[list[AnyMessage], operator.add]
# 使用SqliteSaver进行内存检查点设置
memory = SqliteSaver.from_conn_string(":memory:")
配置搜索工具

tool = TavilySearchResults(max_results=2)

定义状态结构

class AgentState(TypedDict):

messages: Annotated[list[AnyMessage], operator.add]

内存检查点设置

memory = SqliteSaver.from_conn_string(":memory:") # ":memory:"指定SQLite内存数据库

​智能体类实现​

class Agent:
   def __init__(self, model, tools, checkpointer, system=""):
       self.system = system
       graph = StateGraph(AgentState)
       graph.add_node("llm", self.call_openai)
       graph.add_node("action", self.take_action)
       graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
       graph.add_edge("action", "llm")
       graph.set_entry_point("llm")
       self.graph = graph.compile(checkpointer=checkpointer)
       self.tools = {t.name: t for t in tools}
       self.model = model.bind_tools(tools)
   def call_openai(self, state: AgentState):
       messages = state['messages']
       if self.system:
           messages = [SystemMessage(content=self.system)] + messages
       message = self.model.invoke(messages)
       return {'messages': [message]}
   def exists_action(self, state: AgentState):
       result = state['messages'][-1]
       return len(result.tool_calls) > 0
   def take_action(self, state: AgentState):
       tool_calls = state['messages'][-1].tool_calls
       results = []
       for t in tool_calls:
           print(f"Calling: {t}")
           result = self.tools[t['name']].invoke(t['args'])
           results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
       print("Back to the model!")
       return {'messages': results}
 
class Agent:
    """
    This class defines an agent within the LangGraph framework. 

    Args:
        model (langchain.llms.base.AbstractLLM): The large language model to be used by the agent.
        tools (list): A list of LangChain tool objects used for various functionalities.
        checkpointer (langgraph.checkpoint.base.Checkpointer): The checkpointer object for persisting agent state.
        system (str, optional): An optional system message to prepend to the conversation history. Defaults to "".
    """


    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        self.graph = self._build_graph(model, checkpointer)  # Encapsulate graph building in a private method
        self.tools = {t.name: t for t in tools}  # Create a dictionary for efficient tool access by name
        self.model = model.bind_tools(tools)  # Bind tools to the provided large language model


    def _build_graph(self, model, checkpointer):
        """
        This private method builds the LangGraph for the agent.

        Args:
            model (langchain.llms.base.AbstractLLM): The large language model to be used.
            checkpointer (langgraph.checkpoint.base.Checkpointer): The checkpointer object for state persistence.

        Returns:
            StateGraph: The compiled LangGraph for the agent.
        """

        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        return graph.compile(checkpointer=checkpointer)


    def call_openai(self, state: AgentState):
        """
        This method interacts with the large language model.

        Args:
            state (AgentState): The current state of the agent, including message history.

        Returns:
            AgentState: The updated agent state with the latest message from the large language model.
        """

        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}


    def exists_action(self, state: AgentState):
        """
        This method checks if the latest message from the large language model requires any tool actions.

        Args:
            state (AgentState): The current state of the agent.

        Returns:
            bool: True if the latest message contains tool calls, False otherwise.
        """

        result = state['messages'][-1]
        return len(result.tool_calls) > 0


    def take_action(self, state: AgentState):
        """
        This method executes tool actions requested by the large language model.

        Args:
            state (AgentState): The current state of the agent.

        Returns:
            AgentState: The updated agent state with the results of tool actions.
        """

        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results} 

​提示词设置​

prompt = "作为智能研究助手,你需用搜索引擎查询信息。仅在确认需求时发起搜索,允许多次调用或连续查询。"

​初始化智能体​

model = ChatOpenAI(model="gpt-4o")

abot = Agent(model, [tool], system=prompt, checkpointer=memory)

​连续问答演示​

第一问(thread_id=1):

messages = [HumanMessage(content="2022年FIFA世界杯冠军?")]

thread = {"configurable": {"thread_id": "1"}}

for event in abot.graph.stream({"messages": messages}, thread):

for v in event.values(): print(v['messages'])

输出结果:阿根廷决赛击败法国夺冠。

第二问(相同thread_id):

messages = [HumanMessage(content="另一决赛方的首都?")]

thread = {"configurable": {"thread_id": "1"}} # 延续上下文

for event in abot.graph.stream({"messages": messages}, thread):

for v in event.values(): print(v)

输出结果:法国首都是巴黎。

第三问(thread_id=1延续):

messages = [HumanMessage(content="该队上次击败谁夺冠?该国人口?")]

thread = {"configurable": {"thread_id": "1"}}

for event in abot.graph.stream({"messages": messages}, thread):

for v in event.values(): print(v)

输出结果:2018年击败克罗地亚,该国人口400万。

​对比测试(新thread_id)​

messages = [HumanMessage(content="该队上次击败谁夺冠?该国人口?")]

thread = {"configurable": {"thread_id": "2"}} # 新会话ID

for event in abot.graph.stream({"messages": messages}, thread):

for v in event.values(): print(v)

输出结果:混淆提问对象(错误提及阿根廷胜法国),回答法国人口6700万。

​结论​

LangGraph的持久化功能革新了LLM工作流。通过状态保存,支持构建能处理循环任务、多系统交互的动态智能体,实现复杂业务流程自动化,为智能自动化开辟新时代。

建议实践LangGraph并分享见解。

相关标签:人工智能, 机器学习, 深度学习, 软件代理, 智能体, 会话AI, 聊天机器人, Langchain, LangGraph, LLM工作流

参考:LangGraph示例及文档 https://github.com/langchain-ai/langgraph/tree/main/examples

本文地址
最后修改
星期三, 八月 20, 2025 - 16:57
Tags
 
Article