LangGraph

Github 仓库

LangGraph 是 LangChain 的一种扩展实现,它支持更复杂的逻辑编排,从直观理解上认为,LangChain 是一种线性编排结构,LangGraph 为一种图结构

使用 LangGraph 构建应用

在开始前,需要确保我们有办法调用 LLM,而且 LLM 支持工具调用,比如 OpenAI, Anthropic, Google Gemini 等

基本步骤

包安装

pip install -U langgraph langsmith

创建 StateGraph

通过 StateGraph,我们能将程序构建为一个状态机。

其中的节点表示程序能使用的不同的 LLM 或者功能(函数);而其中的边则表示在完成一个任务后,下一步要进行怎样的状态转换

比如如下代码:

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages


class State(TypedDict):
    # 消息的类型为“list”。注解中的`add_messages`函数定义了如何更新此状态键
	# 在这种情况下,它将消息附加到列表中,而不是覆盖它们
    messages: Annotated[list, add_messages]


graph_builder = StateGraph(State)

现在这个图能够处理两个关键的任务:

  1. 每个节点都接受当前状态(State 实例)作为输入,并更新后作为输出
  2. 更新后的状态将添加到现有消息列表

添加节点

在开始前,我们必须首先创建一个可用的 LLM,比如 Gemini

pip install -U "langchain[google-genai]"
import os
from langchain.chat_models import init_chat_model

os.environ["GOOGLE_API_KEY"] = "..."

llm = init_chat_model("google_genai:gemini-2.0-flash")

现在我们向途中添加一个节点,这个节点用于将 state 中的消息发送到 LLM

def chatbot(state: State):
	# 重点关注此处,我们如何将 state 作为输出,并返回一个将更新后的 messages 作为 messages 属性的字典
    return {"messages": [llm.invoke(state["messages"])]}


# 第一个参数是节点的唯一标识
# 第二个参数表示要调用的函数,每当该节点被激活,就会调用一次对应的函数
graph_builder.add_node("chatbot", chatbot)

添加出入口节点

我们需要告诉状态图从哪个节点开始运行,以及从哪个节点退出运行:

# 将 chatbot 标识的节点作为入口状态
graph_builder.add_edge(START, "chatbot")
# 将 chatbot 标识的节点作为终止状态
graph_builder.add_edge("chatbot", END)

编译

在运行这个图之前,我们还需要对其进行编译。当我们通过图构建器编译好这个图后,我们会得到一个 CompiledStateGraph 示例,通过它,我们即可运行这个图

graph = graph_builder.compile()

可视化(可选)

在我们得到一个图后,我们可以通过 get_graph 方法以及绘制方法(如 draw_ascii, draw_png)得到一个描述图结构的图像,比如:

from IPython.display import Image, display

try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    pass

可视化示例

运行

现在我们终于可以运行它了:

def stream_graph_updates(user_input: str):
	# 将用户输入包装为一个包含 messages 的字典
	# 通过 stream 方法传递给 graph
	# 得到一个包含响应信息的 Stream
    for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):
        for value in event.values():
            print("Assistant:", value["messages"][-1].content)


while True:
    try:
        user_input = input("User: ")
		# 退出条件
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break
        stream_graph_updates(user_input)
    except:
		# ...
        break

添加工具

通过向图添加工具,可以将更多能力和流程整合到 LangGraph 中

定义工具

比如我们使用 tavily 作为 AI 搜索引擎

pip install -U langchain-tavily

我们可以通过下面的方式直接使用它 API Reference

from langchain_tavily import TavilySearch

tool = TavilySearch(max_results=2)
tools = [tool]
tool.invoke("What's a 'node' in LangGraph?")

通过下面的方式,我们可以将其绑定到 LLM 中告知 LLM 可以调用的工具有哪些

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

# 告诉 LLM 可以使用哪些工具
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)

接着将工具添加为节点

import json

from langchain_core.messages import ToolMessage


class BasicToolNode:
    """用于运行 AI 消息中请求的工具的节点"""

    def __init__(self, tools: list) -> None:
        self.tools_by_name = {tool.name: tool for tool in tools}

    def __call__(self, inputs: dict):
		# 获取 messsages 作为搜索的输入
        if messages := inputs.get("messages", []):
            message = messages[-1]
        else:
            raise ValueError("No message found in input")
        outputs = []
		# 获取要调用的工具名、参数,解析后逐个调用
        for tool_call in message.tool_calls:
            tool_result = self.tools_by_name[tool_call["name"]].invoke(
                tool_call["args"]
            )
			# 将工具的返回添加到 outputs 数组中作为返回
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result),
                    name=tool_call["name"],
                    tool_call_id=tool_call["id"],
                )
            )
        return {"messages": outputs}


tool_node = BasicToolNode(tools=[tool])
# 将工具添加到图中作为一个节点
graph_builder.add_node("tools", tool_node)

添加一条条件边(conditional_edges

下面的代码演示了我们怎样通过条件边构建一个 Agent 的核心循环

def route_tools(
    state: State,
):
    """
    通过条件边来根据前一条消息是否请求工具决定
	路由到工具节点还是直接退出(END)
    """
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tools"
    return END


# 当 chatbot 的返回被条件函数运行后得到 "tools" 后,则路由到 tools 节点
# 当 chatbot 直接返回(即没有请求工具)时,则得到 END,被路由到结束
graph_builder.add_conditional_edges(
    "chatbot",
    route_tools,
	# 用于告诉状态图不符合条件时,进入到哪个节点
    {"tools": "tools", END: END},
)
# 每次 tools 节点运行结束后,都要回到 chatbot 节点来决定下一步做什么
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()

可视化示例

添加记忆

为了让 Agent 记住上下文,使之具备多轮对话能力,LangGraph 通过“持久化检查点(persistent checkpointing)”方式解决

LangGraph 通过 thread_id 进行记忆,这在复杂状态转移、错误恢复、多轮对话中特别有用

MemorySaver 检查点

通过下面的代码我们可以创建一个使用内存的“检查点”,在生产环境,使用 Sqlite 或其他数据库可能是更合适的方案

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

# ... 构建图

# 使用检查点
graph = graph_builder.compile(checkpointer=memory)

现在,我们使用如下的方法与 Agent 交互

# 首先确定一个唯一的 thread_id
config = {"configurable": {"thread_id": "1"}}

# 将 config 作为 stream() 或 Invoke 的第二个参数
events = graph.stream(
    {"messages": [{"role": "user", "content": user_input}]},
    config,
    stream_mode="values",
)
for event in events:
    event["messages"][-1].pretty_print()

通过 graph.get_state(config) 我们可以获取状态的快照,在快照中存储了消息的上下文信息

添加人机协作

为了让 AI 更符合预期的工作,我们需要引入必要的人工介入。

LangGraph 使用一种中断机制,让正在执行的流程暂停下来,等待用户的反馈后恢复执行。

像普通工具一样,我们可以添加 human_assistance 工具,将人工的反馈作为工具嵌入到状态机中

from langgraph.types import Command, interrupt

@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt({"query": query})
    return human_response["data"]

# 将 human_assistance 函数作为工具绑定到 llm

自定义状态

我们可以为 State 拓展更多属性,这样其他节点可以很容易访问、修改这些属性,包括持久化

class State(TypedDict):
    messages: Annotated[list, add_messages]
    name: str
    birthday: str

LangGraph 允许工具在任何时候自由修改和使用状态:

# 更新状态的属性
graph.update_state(config, {"name": "LangGraph (library)"})
# 获取状态快照
snapshot = graph.get_state(config)

时间旅行

如果我们允许用户退回到某个状态,尝试不同的操作时,则需要支持回档(Rewind)功能

分步执行

为状态图添加步骤,每一个步骤都会被检查点记录(存档)

回放历史状态

被存档的状态即可被回放,我们现在可以查看整个状态变更的历史

to_replay = None
for state in graph.get_state_history(config):
    print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)
    # demo 根据剩余的状态数来确定目标状态,当然我们可以有更好的办法
    if len(state.values["messages"]) == 6:
        to_replay = state

我们会得到如下的输出

Num Messages:  8 Next:  ()
Num Messages:  7 Next:  ('chatbot',)
Num Messages:  6 Next:  ('tools',)
Num Messages:  5 Next:  ('chatbot',)
Num Messages:  4 Next:  ('__start__',)
Num Messages:  4 Next:  ()
Num Messages:  3 Next:  ('chatbot',)
Num Messages:  2 Next:  ('tools',)
Num Messages:  1 Next:  ('chatbot',)
Num Messages:  0 Next:  ('__start__',)

可见,在检查点(存档)中,完整保存了整个调用历史,这为我们回档提供了可能

按时间加载状态

检查点(存档)的 config 属性中,包含了 checkpoint_id 属性,表示时间。通过这个值,LangGraph 可以知道加载什么时候的状态

# to_reply 为上文代码中要恢复到的状态
for event in graph.stream(None, to_replay.config, stream_mode="values"):
    if "messages" in event:
        event["messages"][-1].pretty_print()

其他补充

LangSmith

一个用于辅助开发的调试工具,在使用 LangGraph 进行开发时,一般要进行安装,用于查看与 LLM 的交互、与工具的交互等

创建本地 LangGraph 服务

需要基于 LangSmith

文档