LangChain:LCEL与Runnable使用示例

LCEL (表达式语言)使用示例,包括基础使用,并行处理、条件分支、数据传递、错误处理、自定义逻辑。

自定义 Runnable 让能够将任何 Python 逻辑封装成符合 LCEL 标准的组件,从而无缝集成到 LangChain 管道中,享受流式、批处理、异步等特性。

LCE使用示例

示例1:基础使用

最经典的用法: prompt | llm | output_parser

翻译器链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 1. 导入所需组件
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 2. 初始化组件
# 提示词模板:定义任务,并声明一个变量 {language} 和 {input_text}
prompt = ChatPromptTemplate.from_template(
"请将以下内容翻译成{language}:{input_text}"
)

# 语言模型:此处使用 OpenAI 兼容接口
llm = ChatOpenAI(
model="deepseek-chat", # 替换为你的模型
base_url="你的 Base URL",
api_key="你的 API Key",
)

# 输出解析器:将模型的 AIMessage 输出提取为纯文本字符串
output_parser = StrOutputParser()

# 3. 使用管道符 (|) 构建 LCEL 链
chain = prompt | llm | output_parser

# 4. 调用链(四种方式)
## 4.1 同步调用
# .invoke() 方法传入一个字典,其中的键需要匹配 prompt 中定义的变量
response = chain.invoke({
"language": "英文",
"input_text": "LangChain 是一个用于构建大语言模型应用的框架。"
})

# 4.2 异步调用 (适合Web服务)
# async_result = await chain.ainvoke(...)

# 4.3 流式调用 (适合实时输出)
# for chunk in chain.stream(...):
# print(chunk, end="", flush=True)

# 4.4 批量调用
# results = chain.batch([{"language": "法语", "input_text": "你好"}, {"language": "日语", "input_text": "谢谢"}])

print(response)
# 预期输出: "LangChain is a framework for building large language model applications."

一个简单的问答链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 1. 定义各个组件
# 提示词模板:一个 Runnable,接收 'topic' 变量,输出格式化后的提示词
prompt = ChatPromptTemplate.from_template("你是一位技术专家。请用通俗易懂的语言解释一下:{topic}。")

# 模型:一个 Runnable,接收提示词,输出 AI 响应
# 请确保已设置好环境变量 OPENAI_API_KEY
model = ChatOpenAI(model="gpt-4o")

# 输出解析器:一个 Runnable,将 AI 的原始响应(AIMessage)解析为字符串
output_parser = StrOutputParser()

# 2. 使用管道符 '|' 将它们组合成一个链
# 数据流向:{"topic": "..."} -> prompt -> model -> output_parser -> str
chain = prompt | model | output_parser

# 3. 调用这个链
# 同步调用
result = chain.invoke({"topic": "什么是量子纠缠?"})
print(result)

# 流式调用,提升用户体验
print("流式输出:", end="", flush=True)
for chunk in chain.stream({"topic": "什么是递归?"}):
print(chunk, end="", flush=True)
print() # 换行

# 批量调用,提高效率
results = chain.batch([{"topic": "API"}, {"topic": "微服务"}])
for i, res in enumerate(results):
print(f"结果 {i}: {res}")

简单的问答链,提示词指定角色:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 1. 创建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一位专业的{role}。"),
("human", "{question}")
])

# 2. 初始化模型
model = ChatOpenAI(model="gpt-4", temperature=0.7)

# 3. 输出解析器
parser = StrOutputParser()

# 4. 使用管道符组合
chain = prompt | model | parser

# 5. 执行
response = chain.invoke({
"role": "Python专家",
"question": "解释Python中的装饰器"
})
print(response)

关键解读

  • prompt | llm | output_parser创建了一个清晰的数据流输入字典-> 提示模板-> LLM模型-> 输出解析器-> 最终字符串
  • 每个组件都实现了 Runnable接口,知道如何接收上游输入,并产生下游能理解的输出。

示例2:复杂逻辑

LCEL 的强大之处在于它能轻松处理分支和并行等复杂逻辑,而无需编写复杂的 if/else 或循环。

1. 并行处理 (RunnableParallel)

当需要同时处理多个独立任务时(例如,从多个知识库检索信息),RunnableParallel 并发执行可以显著提升效率。

假设想同时获取一个城市的简介和最佳旅游时间

1
2
3
4
5
6
7
from langchain_core.runnables import RunnableParallel

# 假设我们想同时获取一个城市的简介和最佳旅游时间
chain = RunnableParallel(
intro=prompt_intro | llm | StrOutputParser(), # 任务1:生成简介
best_time=prompt_time | llm | StrOutputParser() # 任务2:推荐时间
) | final_prompt | llm | StrOutputParser() # 将两个结果合并后,再交给LLM处理

假设想同时生成摘要和提取关键词

以下示例,LangChain 的 RunnableParallel 并行执行三个不同的任务:

  • **summary**:使用预先定义的 prompt 模板和 LLM 链生成文本摘要。
  • **keywords**:使用另一个提示模板提取文本关键词。
  • **length**:直接通过 lambda 函数计算输入文本的长度。

RunnableParallel并发执行这些任务(而非顺序执行),然后将结果合并为一个字典,键分别为 summarykeywordslength。这样可以显著提高效率,尤其适合需要从同一输入派生多个独立结果的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from langchain_core.runnables import RunnableParallel, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from typing import Dict, Any

# ---------- 1. 定义基础组件 ----------
# 摘要提示模板(假设输入字段为 "text")
summary_prompt = ChatPromptTemplate.from_template("请用一句话总结以下内容:\n{text}")

# 关键词提取提示模板
keyword_prompt = ChatPromptTemplate.from_template("提取以下文本中的关键词(逗号分隔):\n{text}")

# 语言模型(使用 gpt-4o-mini 降低成本)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 输出解析器:将 LLM 的 AIMessage 转为字符串
parser = StrOutputParser()

# ---------- 2. 构建子链 ----------
# 摘要链
summary_chain = summary_prompt | llm | parser

# 关键词链
keyword_chain = keyword_prompt | llm | parser

# 长度计算链(用 RunnableLambda 包装纯函数)
def compute_length(inputs: Dict[str, Any]) -> int:
"""返回输入文本的字符长度"""
return len(inputs["text"])

length_chain = RunnableLambda(compute_length)

# ---------- 3. 并行执行 ----------
parallel_chain = RunnableParallel(
summary=summary_chain,
keywords=keyword_chain,
length=length_chain
)

# ---------- 4. 调用并处理结果 ----------
input_text = "LangChain 是一个开发框架,用于构建基于大语言模型的应用程序。"

try:
result = parallel_chain.invoke({"text": input_text})
print("摘要:", result["summary"])
print("关键词:", result["keywords"])
print("长度:", result["length"])
except Exception as e:
print(f"执行失败: {e}")

代码关键点说明

  • **RunnableParallel**:同时启动多个 Runnable,互不阻塞。
  • **RunnableLambda**:将普通 Python 函数转换为 LangChain 可运行的组件,确保类型一致。
  • 异常捕获:避免因 LLM API 错误导致整个程序崩溃。
  • 明确分离组件定义与组合:便于复用和测试。

多步骤处理(分析 → 翻译 → 优化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain_core.runnables import RunnablePassthrough

# 步骤 1:分析文本情感
analyze_prompt = ChatPromptTemplate.from_template(
"分析以下文本的情感(积极/消极/中性),只返回标签:\n\n{text}"
)
analyzer_chain = analyze_prompt | ChatOpenAI() | StrOutputParser()

# 步骤 2:根据情感调整翻译风格
translate_prompt = ChatPromptTemplate.from_template(
"用{sentiment}的风格翻译以下内容:\n\n{text}"
)

# 组合管道(使用 RunnablePassthrough 保留原始数据)
full_chain = (
{
"text": RunnablePassthrough(), # 原始文本
"sentiment": analyzer_chain # 情感分析结果
}
| translate_prompt
| ChatOpenAI()
| StrOutputParser()
)

result = full_chain.invoke("这个产品太棒了!")

并行处理(Map-Reduce 模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langchain_core.runnables import RunnableParallel

# 创建并行处理链
parallel_chain = RunnableParallel(
summary=(
ChatPromptTemplate.from_template("总结以下内容:{text}")
| ChatOpenAI()
| StrOutputParser()
),
keywords=(
ChatPromptTemplate.from_template("从以下内容提取关键词:{text}")
| ChatOpenAI()
| StrOutputParser()
),
sentiment=(
ChatPromptTemplate.from_template("分析以下内容的情感:{text}")
| ChatOpenAI()
| StrOutputParser()
)
)

# 执行 - 三个任务并行运行
result = parallel_chain.invoke({
"text": "OpenAI发布了GPT-5,性能大幅提升,令人兴奋!"
})
# 返回: {"summary": "...", "keywords": "...", "sentiment": "..."}

2. 条件分支 (RunnableBranch)

RunnableBranch 可以根据输入的条件,动态选择不同的处理路径,实现“if-else”逻辑。

根据问的是天气还是新闻路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langchain_core.runnables import RunnableBranch

# 根据问题的主题,选择不同的提示模板
weather_branch_chain = RunnableBranch(
(lambda x: "天气" in x["question"], weather_prompt | llm),
(lambda x: "新闻" in x["question"], news_prompt | llm),
default_prompt | llm # 默认路径
)

text_branch_chain = RunnableBranch(
(lambda x: len(x["text"]) < 20, lambda x: f"短文本: {x['text']}"),
(lambda x: len(x["text"]) >= 20, lambda x: f"长文本: {x['text']}"),
lambda x: "未知" # 默认分支
)

根据文档内容长度路由

1
2
3
4
5
6
7
8
9
10
11
12
from langchain_core.runnables import RunnableBranch

# 定义分支:根据输入长度选择不同处理方式
branch_chain = RunnableBranch(
(lambda x: len(x["text"]) > 100, prompt_long | llm | output_parser), # 长文本总结
(lambda x: len(x["text"]) <= 100, prompt_short | llm | output_parser), # 短文本扩写
default_prompt | llm | output_parser # 默认情况
)

# 也可以与管道符结合
full_chain = {"text": RunnablePassthrough()} | branch_chain
result = full_chain.invoke({"text": "这是一个很长的文档内容..."})

判断值条件路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain_core.runnables import RunnableBranch

# 假设我们有一个分类器链,能判断用户意图是 'code', 'data' 还是 'general'
# classifier_chain = ... (一个输出意图分类的链)

# 定义针对不同意图的子链
code_chain = ... # 处理编程问题的链
data_chain = ... # 处理数据分析问题的链
general_chain = ... # 处理通用问题的链

# 使用 RunnableBranch 创建路由链
routing_chain = RunnableBranch(
(lambda x: x["intent"] == "code", code_chain),
(lambda x: x["intent"] == "data", data_chain),
general_chain, # 默认分支
)

# 最终链:先分类,再路由
# full_chain = classifier_chain | routing_chain

链嵌套使用路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langchain_core.runnables import RunnableBranch, RunnableLambda

# 定义路由函数
def route_by_topic(info):
topic = info.get("topic", "").lower()
if "code" in topic or "编程" in topic:
return "programming"
return "general"

# 创建不同分支的链
programming_chain = (
ChatPromptTemplate.from_template("作为编程专家,回答:{question}")
| ChatOpenAI()
| StrOutputParser()
)

general_chain = (
ChatPromptTemplate.from_template("作为通用助手,回答:{question}")
| ChatOpenAI()
| StrOutputParser()
)

# 使用 RunnableBranch 实现条件路由
branch = RunnableBranch(
(lambda x: route_by_topic(x) == "programming", programming_chain),
general_chain # 默认分支
)

# 完整链
full_chain = {
"topic": lambda x: x["topic"],
"question": lambda x: x["question"]
} | branch

# 执行
result = full_chain.invoke({
"topic": "编程问题",
"question": "Python列表推导式怎么用?"
})

3. 数据传递 (RunnablePassthrough)

RunnablePassthrough 就像一个“数据通道”,在复杂的字典流中,它可以原封不动地传递某个值,常与 RunnableParallel 配合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 导入 Runnable 基类和 RunnableConfig 类型,用于定义自定义可运行组件
from langchain_core.runnables import Runnable, RunnableConfig
# 导入类型提示:Dict 表示字典,Any 表示任意类型
from typing import Dict, Any

# 定义自定义验证器类,继承自 Runnable[输入类型, 输出类型]
# 这里指定输入和输出均为 Dict[str, Any] 类型(字典)
class MyValidator(Runnable[Dict, Dict]):

# 实现同步调用方法 invoke,这是 Runnable 接口要求必须实现的方法
# 参数:
# input: 输入字典
# config: 可选的运行时配置(如回调、标签等)
# 返回值:处理后的字典
def invoke(self, input: Dict, config: RunnableConfig = None) -> Dict:
# 检查输入字典中是否包含 "text" 键
if "text" not in input:
# 如果没有,抛出值错误异常,中断链的执行
raise ValueError("缺少 'text' 字段")
# 添加一个验证标志,标记输入已经通过校验
input["validated"] = True
# 返回修改后的字典(注意:直接修改了原字典)
return input

# 实例化验证器对象
validator = MyValidator()

# 构建 LCEL 链:
# 1. RunnablePassthrough() 是一个恒等组件,原样传递输入,常用于占位或并行分支中保留原始数据
# 2. | validator : 将输入传递给 validator,执行字段校验并添加 validated 标志
# 3. | llm : 将校验后的字典传递给语言模型(需提前定义,例如 llm = ChatOpenAI())
# 4. | parser : 将 LLM 的输出(AIMessage)解析为字符串(需提前定义,例如 parser = StrOutputParser())
# 注意:llm 和 parser 需要预先定义,此处仅为示意
chain = RunnablePassthrough() | validator | llm | parser

代码说明

  • MyValidator 是一个自定义的数据校验组件,确保输入字典包含 "text" 字段,并通过添加 "validated": True 标记来表明校验通过。
  • 使用 LCEL 管道将恒等传递校验LLM 调用输出解析串联成一个完整的处理链。
  • 执行时,输入字典会依次流经各个组件,最终返回解析后的字符串。
  • RunnablePassthrough() 在这里的作用是保持链的输入类型一致(Dict),因为 validator 期望接收字典。如果直接 validator | llm,链的输入类型仍然是字典,但 RunnablePassthrough 通常用于需要“旁路”原始数据到后续步骤的场景。
  • 示例代码自定义 Runnable 类没有实现异步版本 ainvoke,因此只能用于同步链。如需异步支持,应同时实现 async def ainvoke(...)

4. 自定义逻辑 (RunnableLambda)

RunnableLambda 允许你将任意的 Python 函数包装成一个 Runnable 对象,无缝嵌入 LCEL 链中,实现复杂的数据转换或逻辑判断。

带检索的RAG链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 假设已有向量数据库
vectorstore = FAISS.load_local("faiss_index", OpenAIEmbeddings())
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 提示模板
template = """基于以下上下文回答问题:
{context}

问题:{question}
"""
prompt = ChatPromptTemplate.from_template(template)

# 模型和解析器
model = ChatOpenAI(model="gpt-4")
parser = StrOutputParser()

# 辅助函数:格式化检索结果
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

# 将其包装为 Runnable 后,就可以像其他组件一样使用
# chain = retriever | RunnableLambda(format_docs) | prompt | llm | parser
# 构建 RAG 链
rag_chain = (
{
"context": retriever | format_docs, # 检索并格式化文档
"question": RunnablePassthrough() # 直接传递用户问题
}
| prompt
| model
| parser
)

# 执行
result = rag_chain.invoke("什么是LCEL?")
print(result)

使用 RunnableLambda 把普通函数包装成 Runnable 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_core.runnables import RunnableLambda

# 定义一个普通函数
def extract_keywords(text: str) -> list:
# 这里是一个简单的模拟
keywords = ["模型", "人工智能", "学习"]
return [kw for kw in keywords if kw in text]

# 使用 RunnableLambda 将其转换为 Runnable
keyword_extractor = RunnableLambda(extract_keywords)

# 集成到链中:先生成答案,再提取关键词
complex_chain = prompt | model | StrOutputParser() | keyword_extractor

result = complex_chain.invoke({"topic": "深度学习"})
print(result) # 输出: ['模型', '学习']

5. 错误处理 (RunnableWithFallback)

LCEL链可以轻松添加重试和回退逻辑。RunnableWithFallback 可以为一个 Runnable 组件配置一个或多个“备胎”。当主组件执行失败时,会自动调用备用组件,提升系统的健壮性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_core.runnables import RunnableWithFallback

# 如果主模型(chat_model)调用失败,就使用备用模型(fallback_model)
robust_chain = RunnableWithFallback(
runnable=chat_prompt | chat_model,
fallbacks=[chat_prompt | fallback_model]
)

robust_chain = RunnableWithFallbacks(
primary_chain, # 第一个参数:主链(必须)
fallback_strategy=[ # 第二个参数:备用策略列表
backup_chain1, # 第1个备用:可能是 cheaper LLM
backup_chain2, # 第2个备用:可能是本地模型
lambda x: "服务暂时不可用" # 最终兜底:硬编码返回
]
)
1
2
from langchain_core.runnables import RunnableWithFallbacks
chain_with_fallback = chain.with_fallbacks([fallback_chain1, fallback_chain2])

场景:智能客服的降级策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from langchain_core.runnables import RunnableWithFallbacks, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_community.llms import Tongyi # 国产备用模型

# ========== 主链:GPT-4(高质量但可能不稳定) ==========
gpt4_chain = (
ChatPromptTemplate.from_template("作为专业客服,详细回答:{question}")
| ChatOpenAI(model="gpt-4", temperature=0.7)
)

# ========== 备用1:GPT-3.5(更快更稳定) ==========
gpt35_chain = (
ChatPromptTemplate.from_template("简要回答用户问题:{question}")
| ChatOpenAI(model="gpt-3.5-turbo", temperature=0.5)
)

# ========== 备用2:通义千问(国产兜底) ==========
tongyi_chain = (
ChatPromptTemplate.from_template("回答问题:{question}")
| Tongyi(model_name="qwen-turbo")
)

# ========== 最终兜底:硬编码回复 ==========
def final_fallback(input_dict):
"""当所有模型都失败时,返回友好提示"""
return {
"answer": "抱歉,智能客服暂时无法连接,请稍后再试或拨打人工热线:400-xxx-xxxx",
"status": "service_unavailable",
"original_question": input_dict.get("question", "")
}

# ========== 组装容错链 ==========
customer_service_chain = RunnableWithFallbacks(
gpt4_chain,
fallback_strategy=[
gpt35_chain,
tongyi_chain,
RunnableLambda(final_fallback) # 包装为 Runnable
]
)

# ========== 执行 ==========
try:
result = customer_service_chain.invoke({"question": "如何申请退款?"})
print(result)
except Exception as e:
# 理论上不会走到这里,因为最终兜底总会返回
print(f"完全失败:{e}")

示例3:LCEL构建Agent

在Agent开发中的核心应用:一个典型的ReAct模式Agent,其核心就是由LCEL构建的。

构建一个简易的数学计算Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
import math

# 1. 定义工具(工具本身也是Runnable)
def calculate_sqrt(input: str) -> str:
"""计算平方根。输入应为数字。"""
try:
return str(math.sqrt(float(input)))
except:
return "输入无效,请输入一个数字。"

sqrt_tool = Tool(
name="sqrt_calculator",
func=calculate_sqrt,
description="计算一个数字的平方根。输入应为单个数字。"
)

# 2. 使用LCEL风格创建Agent(现代方式)
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate

# 定义提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的数学助手。请使用合适的工具来回答问题。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"), # 这是关键!用于存放Agent的思考过程和工具调用记录
])

# 创建Agent
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
tools = [sqrt_tool]
agent = create_tool_calling_agent(llm, tools, prompt)

# 3. 创建执行器并运行
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
result = agent_executor.invoke({"input": "144的平方根是多少?"})
print(result["output"])

在这个Agent内部

  1. create_tool_calling_agent函数内部利用LCEL构建了一个复杂的链,该链能够:
    • 解析用户输入。
    • LLM思考是否需要调用工具(以及调用哪个)。
    • 如果调用工具,则将工具调用插入到 agent_scratchpad中。
    • 将工具结果返回给LLM进行下一步思考或生成最终答案。
  2. AgentExecutor负责循环执行这个链,直到LLM决定不再调用工具,并输出最终答案。

示例4:自定义的LCEL Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.runnables import RunnablePassthrough

# 假设我们已经定义好了:llm, tools, prompt
# 1. 定义一个“工具执行”的Runnable
def execute_tools(data):
"""根据LLM的输出,执行对应的工具。"""
action: AgentAction = data["action"]
tool_to_use = {t.name: t for t in tools}[action.tool]
observation = tool_to_use.invoke(action.tool_input)
return {"observation": observation, "intermediate_steps": data["intermediate_steps"] + [(action, observation)]}

# 2. 使用LCEL组装Agent的核心循环
agent_chain = (
RunnablePassthrough.assign(intermediate_steps=[]) # 初始化步骤记录
| prompt # 生成提示
| llm # LLM思考
| parse_llm_output # 解析LLM输出,判断是调用工具还是结束
| RunnableLambda(execute_tools) # 如果需要,执行工具
# 这里实际上需要一个循环,直到parse_llm_output返回AgentFinish
)
# 注:这是一个简化示意,完整的ReAct循环需要更复杂的控制流(通常用RunnableBranch实现)。

流式输出(Streaming)是LCEL的一等公民

1
2
3
4
5
6
7
8
9
10
for chunk in agent_executor.stream({"input": "你的问题"}):
if "actions" in chunk:
# 正在调用工具
print(f"调用工具: {chunk['actions'][0].tool}")
elif "output" in chunk:
# 最终输出
print(chunk['output'])
else:
# LLM思考的token流
print(chunk.get('messages', [{}])[-1].content, end='')

示例5:Agent+工具调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

from model.deepseek.config import DeepSeekConfig

# 1. 定义工具
@tool
def search_product(query: str) -> str:
"""搜索产品信息"""
return f"搜索结果:{query} 的相关产品..."

@tool
def calculate_price(quantity: int, unit_price: float) -> str:
"""计算总价"""
return f"总价:{quantity * unit_price} 元"

tools = [search_product, calculate_price]
llm = ChatOpenAI(
model=model,
api_key=api_key,
base_url=base_url,
temperature=0
)

# 2. 创建 Agent(LangChain 1.x 风格)
agent = create_agent(
model=llm,
tools=tools,
system_prompt="你是电商助手"
)

# 3. 执行(LCEL 风格)
result = agent.invoke({
"messages": [HumanMessage(content="买3件单价199元的T恤,总共多少钱?")]
})
print(result["messages"][-1].content)

示例6:RAG(检索增强生成)链

LangChain 中构建 RAG(检索增强生成)链 的核心思路是:

  1. 并行获取数据:同时执行两个任务——检索相关文档并格式化,以及透传用户原始问题。
  2. 合并为 Prompt 输入:将检索到的上下文和用户问题组合成一个字典,传递给提示模板。
  3. 生成回答:通过 LLM 和输出解析器得到最终答案。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# ---------- 1. 假设已有组件(需要预先定义)----------
# 文档检索器:根据用户问题从向量数据库检索相关文档片段
# retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

# 文档格式化函数:将检索到的 Document 对象列表合并成一个字符串
# def format_docs(docs):
# return "\n\n".join(doc.page_content for doc in docs)

# 提示模板:接收 context 和 question 两个变量
# prompt = ChatPromptTemplate.from_template(
# "请基于以下上下文回答问题:\n{context}\n\n问题:{question}"
# )

# 语言模型
# llm = ChatOpenAI(model="gpt-4o-mini")

# 输出解析器:将 LLM 的 AIMessage 转为字符串
# parser = StrOutputParser()

# ---------- 2. 优化后的 RAG 链(拆解为明确步骤)----------
# 步骤1:定义一个并行分支,分别处理上下文检索和问题透传
parallel_branch = {
# 分支1:检索并格式化文档作为上下文
"context": retriever | format_docs,
# 分支2:直接透传用户问题(输入是什么,输出就是什么)
"question": RunnablePassthrough()
}
# 上述字典等价于:RunnableParallel(context=retriever | format_docs, question=RunnablePassthrough())

# 步骤2:将并行分支的输出(字典)传入提示模板
# 提示模板会自动解包字典中的 context 和 question 变量
prompt_filled = parallel_branch | prompt

# 步骤3:依次经过 LLM 和解析器,生成最终答案
rag_chain = prompt_filled | llm | parser

# 更简洁的链式写法(一行流):
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough()
}
| prompt
| llm
| parser
)

# ---------- 3. 使用示例 ----------
# user_question = "什么是LangChain?"
# answer = rag_chain.invoke(user_question)
# print(answer)

代码解析

组件 作用
RunnablePassthrough() 原样返回输入,不做任何修改。
这里用于保留用户原始问题,避免在并行分支中被覆盖。
{"context": ..., "question": ...} RunnableParallel 的简写。同时执行两个子链,输出合并为字典。
`retriever format_docs`
prompt 提示模板,需要 contextquestion 两个变量。
`llm parser`

流程示意图

flowchart TD
    A["用户输入
(例如:什么是LangChain?)"] --> B["RunnableParallel
并行执行两个分支"] B --> C["分支1: context
retriever | format_docs"] B --> D["分支2: question
RunnablePassthrough()"] C --> E["检索并格式化文档
输出:上下文字符串"] D --> F["原样透传问题
输出:用户原始问题"] E --> G["合并为字典
{ context: ..., question: ... }"] F --> G G --> H["提示模板(Prompt)
填充 context 和 question"] H --> I["LLM 生成答案"] I --> J["输出解析器(StrOutputParser)"] J --> K["最终回答"]

自定义Runnable类

自定义 Runnable 让能够将任何 Python 逻辑封装成符合 LCEL 标准的组件,从而无缝集成到 LangChain 管道中,享受流式、批处理、异步等特性。关键在于正确实现 invoke(同步)和可选地实现 ainvokestreambatch,并遵循输入输出类型约定。

一个自定义 Runnable 类的典型示例,实现文本预处理功能:去除多余空格、统一小写、移除特殊字符。该类可以无缝集成到 LCEL 管道中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from typing import Iterator, Optional, Dict, Any
from langchain_core.runnables import Runnable, RunnableConfig
import re

class TextPreprocessor(Runnable[str, str]):
"""
自定义 Runnable:文本预处理
输入:原始字符串
输出:清洗后的字符串(去除多余空格、转小写、移除标点符号)
"""

def __init__(self, to_lower: bool = True, remove_punctuation: bool = True):
"""
初始化预处理参数
:param to_lower: 是否转换为小写
:param remove_punctuation: 是否移除标点符号(保留字母、数字、空格)
"""
self.to_lower = to_lower
self.remove_punctuation = remove_punctuation

def invoke(self, input: str, config: Optional[RunnableConfig] = None) -> str:
"""同步处理方法"""
# 去除首尾空格,并将多个连续空格合并为一个
text = re.sub(r'\s+', ' ', input.strip())

if self.remove_punctuation:
# 移除非字母、非数字、非空格的字符(保留中文?可根据需要调整)
text = re.sub(r'[^\w\s]', '', text)

if self.to_lower:
text = text.lower()

return text

async def ainvoke(self, input: str, config: Optional[RunnableConfig] = None) -> str:
"""异步处理方法(直接复用同步逻辑,实际场景可加异步IO)"""
return self.invoke(input, config)

# 可选:支持流式输出(逐字符或逐词输出)
def stream(self, input: str, config: Optional[RunnableConfig] = None) -> Iterator[str]:
"""逐词输出处理结果,模拟流式效果"""
processed = self.invoke(input, config)
for word in processed.split():
yield word + " "
# 可以添加时间延迟模拟真实流式
# import time; time.sleep(0.05)

# 可选:批量处理优化(父类默认实现为循环调用 invoke,可重写以提高效率)
def batch(self, inputs: list, config: Optional[RunnableConfig] = None) -> list:
"""批量处理多个字符串(无需重写,默认已支持,此处仅为演示)"""
return [self.invoke(inp, config) for inp in inputs]

单独使用

1
2
3
preprocessor = TextPreprocessor(to_lower=True, remove_punctuation=True)
cleaned = preprocessor.invoke(" Hello, World! This is LangChain. ")
print(cleaned) # 输出: "hello world this is langchain"

集成到 LCEL 管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 定义管道:预处理 -> 提示模板 -> LLM -> 解析器
prompt = ChatPromptTemplate.from_template("请用一句话回答:{question}")
llm = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()

chain = TextPreprocessor() | prompt | llm | parser

# 用户输入带有噪声的文本
raw_input = " Tell me, what's the weather like today? "
result = chain.invoke({"question": raw_input}) # 注意:TextPreprocessor 接收字符串,但 chain 输入是字典
# 上述会报错,因为 TextPreprocessor 期望输入为 str,而 chain 输入为 dict

修正集成方式:需要从字典中提取 question 字段传递给 TextPreprocessor。可以使用 RunnablePassthrough 或自定义映射:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.runnables import RunnableLambda, RunnableParallel

# 方法1:在管道中先提取 question 字段
chain = (
RunnableLambda(lambda d: d["question"]) # 提取字符串
| TextPreprocessor()
| (lambda clean: {"question": clean}) # 重新包装为字典,供 prompt 使用
| prompt
| llm
| parser
)

# 方法2:使用 RunnableParallel 保留原始结构
chain = (
{"processed": RunnableLambda(lambda d: d["question"]) | TextPreprocessor()}
| (lambda d: {"question": d["processed"]})
| prompt | llm | parser
)

# 调用
result = chain.invoke({"question": " Hello! How are you? "})
print(result)

自定义 Runnable 的关键点

要素 说明
继承 Runnable[InputType, OutputType] 泛型指定输入输出类型,增强类型安全
实现 invoke 同步调用核心逻辑,必须实现
**实现 ainvoke**(可选) 支持异步,如不实现则默认调用 invoke,但建议至少实现占位
**实现 stream**(可选) 支持流式输出,用于逐词/逐块返回
**实现 batch**(可选) 默认基类会循环调用 invoke,可重写优化(如并行请求)
使用 config 参数 接收 RunnableConfig,可读取回调、标签等运行时配置

场景示例

场景1:敏感信息脱敏

1
2
3
4
class MaskPII(Runnable[str, str]):
def invoke(self, input: str, config=None) -> str:
# 用正则匹配身份证、手机号等并替换为 ***
return re.sub(r'\b\d{11}\b', '***手机号***', input)

场景2:调用外部 API 进行翻译

1
2
3
4
5
6
class Translate(Runnable[str, str]):
def __init__(self, target_lang="zh"):
self.target_lang = target_lang
def invoke(self, input: str, config=None) -> str:
# 调用百度翻译 API 等
return translated_text

场景3:缓存查询结果

1
2
3
4
5
6
7
class Cache(Runnable[str, str]):
def __init__(self):
self.cache = {}
def invoke(self, input: str, config=None) -> str:
if input not in self.cache:
self.cache[input] = expensive_lookup(input)
return self.cache[input]
作者

光星

发布于

2026-04-12

更新于

2026-04-19

许可协议

评论