LangChain:LCEL(表达式语言)使用示例

LCEL 表达式语方使用示例,包括基础使用,并行处理、条件分支、数据传递、错误处理、自定义逻辑。

LCE使用示例

示例1:基础使用

最经典的用法: prompt | llm | output_parser

翻译器链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 1. 导入所需组件
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 2. 初始化组件
# 提示词模板:定义任务,并声明一个变量 {language} 和 {input_text}
prompt = ChatPromptTemplate.from_template(
"请将以下内容翻译成{language}:{input_text}"
)

# 语言模型:此处使用 OpenAI 兼容接口
llm = ChatOpenAI(
model="deepseek-chat", # 替换为你的模型
base_url="你的 Base URL",
api_key="你的 API Key",
)

# 输出解析器:将模型的 AIMessage 输出提取为纯文本字符串
output_parser = StrOutputParser()

# 3. 使用管道符 (|) 构建 LCEL 链
chain = prompt | llm | output_parser

# 4. 调用链(四种方式)
## 4.1 同步调用
# .invoke() 方法传入一个字典,其中的键需要匹配 prompt 中定义的变量
response = chain.invoke({
"language": "英文",
"input_text": "LangChain 是一个用于构建大语言模型应用的框架。"
})

# 4.2 异步调用 (适合Web服务)
# async_result = await chain.ainvoke(...)

# 4.3 流式调用 (适合实时输出)
# for chunk in chain.stream(...):
# print(chunk, end="", flush=True)

# 4.4 批量调用
# results = chain.batch([{"language": "法语", "input_text": "你好"}, {"language": "日语", "input_text": "谢谢"}])

print(response)
# 预期输出: "LangChain is a framework for building large language model applications."

一个简单的问答链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 1. 定义各个组件
# 提示词模板:一个 Runnable,接收 'topic' 变量,输出格式化后的提示词
prompt = ChatPromptTemplate.from_template("你是一位技术专家。请用通俗易懂的语言解释一下:{topic}。")

# 模型:一个 Runnable,接收提示词,输出 AI 响应
# 请确保已设置好环境变量 OPENAI_API_KEY
model = ChatOpenAI(model="gpt-4o")

# 输出解析器:一个 Runnable,将 AI 的原始响应(AIMessage)解析为字符串
output_parser = StrOutputParser()

# 2. 使用管道符 '|' 将它们组合成一个链
# 数据流向:{"topic": "..."} -> prompt -> model -> output_parser -> str
chain = prompt | model | output_parser

# 3. 调用这个链
# 同步调用
result = chain.invoke({"topic": "什么是量子纠缠?"})
print(result)

# 流式调用,提升用户体验
print("流式输出:", end="", flush=True)
for chunk in chain.stream({"topic": "什么是递归?"}):
print(chunk, end="", flush=True)
print() # 换行

# 批量调用,提高效率
results = chain.batch([{"topic": "API"}, {"topic": "微服务"}])
for i, res in enumerate(results):
print(f"结果 {i}: {res}")

简单的问答链,提示词指定角色:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 1. 创建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一位专业的{role}。"),
("human", "{question}")
])

# 2. 初始化模型
model = ChatOpenAI(model="gpt-4", temperature=0.7)

# 3. 输出解析器
parser = StrOutputParser()

# 4. 使用管道符组合
chain = prompt | model | parser

# 5. 执行
response = chain.invoke({
"role": "Python专家",
"question": "解释Python中的装饰器"
})
print(response)

关键解读

  • prompt | llm | output_parser创建了一个清晰的数据流输入字典-> 提示模板-> LLM模型-> 输出解析器-> 最终字符串
  • 每个组件都实现了 Runnable接口,知道如何接收上游输入,并产生下游能理解的输出。

示例2:复杂逻辑

LCEL 的强大之处在于它能轻松处理分支和并行等复杂逻辑,而无需编写复杂的 if/else 或循环。

1. 并行处理 (RunnableParallel)

当需要同时处理多个独立任务时(例如,从多个知识库检索信息),RunnableParallel 并发执行可以显著提升效率。

假设想同时获取一个城市的简介和最佳旅游时间

1
2
3
4
5
6
7
from langchain_core.runnables import RunnableParallel

# 假设我们想同时获取一个城市的简介和最佳旅游时间
chain = RunnableParallel(
intro=prompt_intro | llm | StrOutputParser(), # 任务1:生成简介
best_time=prompt_time | llm | StrOutputParser() # 任务2:推荐时间
) | final_prompt | llm | StrOutputParser() # 将两个结果合并后,再交给LLM处理

假设想同时生成摘要和提取关键词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_core.runnables import RunnableParallel

# 定义两个独立的子任务
summary_chain = ... # 生成摘要的链
keywords_chain = ... # 提取关键词的链

# 使用 RunnableParallel 并行执行
parallel_chain = RunnableParallel(
summary=summary_chain,
keywords=keywords_chain,
)

# 调用后,会同时得到两个结果
result = parallel_chain.invoke({"input": "一篇关于AI的长文章"})
print(result['summary'])
print(result['keywords'])

多步骤处理(分析 → 翻译 → 优化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain_core.runnables import RunnablePassthrough

# 步骤 1:分析文本情感
analyze_prompt = ChatPromptTemplate.from_template(
"分析以下文本的情感(积极/消极/中性),只返回标签:\n\n{text}"
)
analyzer_chain = analyze_prompt | ChatOpenAI() | StrOutputParser()

# 步骤 2:根据情感调整翻译风格
translate_prompt = ChatPromptTemplate.from_template(
"用{sentiment}的风格翻译以下内容:\n\n{text}"
)

# 组合管道(使用 RunnablePassthrough 保留原始数据)
full_chain = (
{
"text": RunnablePassthrough(), # 原始文本
"sentiment": analyzer_chain # 情感分析结果
}
| translate_prompt
| ChatOpenAI()
| StrOutputParser()
)

result = full_chain.invoke("这个产品太棒了!")

并行处理(Map-Reduce 模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langchain_core.runnables import RunnableParallel

# 创建并行处理链
parallel_chain = RunnableParallel(
summary=(
ChatPromptTemplate.from_template("总结以下内容:{text}")
| ChatOpenAI()
| StrOutputParser()
),
keywords=(
ChatPromptTemplate.from_template("从以下内容提取关键词:{text}")
| ChatOpenAI()
| StrOutputParser()
),
sentiment=(
ChatPromptTemplate.from_template("分析以下内容的情感:{text}")
| ChatOpenAI()
| StrOutputParser()
)
)

# 执行 - 三个任务并行运行
result = parallel_chain.invoke({
"text": "OpenAI发布了GPT-5,性能大幅提升,令人兴奋!"
})
# 返回: {"summary": "...", "keywords": "...", "sentiment": "..."}

2. 条件分支 (RunnableBranch)

RunnableBranch 可以根据输入的条件,动态选择不同的处理路径,实现“if-else”逻辑。

根据问的是天气还是新闻路由

1
2
3
4
5
6
7
8
from langchain_core.runnables import RunnableBranch

# 根据问题的主题,选择不同的提示模板
branch_chain = RunnableBranch(
(lambda x: "天气" in x["question"], weather_prompt | llm),
(lambda x: "新闻" in x["question"], news_prompt | llm),
default_prompt | llm # 默认路径
)

根据文档内容长度路由

1
2
3
4
5
6
7
8
9
10
11
12
from langchain_core.runnables import RunnableBranch

# 定义分支:根据输入长度选择不同处理方式
branch_chain = RunnableBranch(
(lambda x: len(x["text"]) > 100, prompt_long | llm | output_parser), # 长文本总结
(lambda x: len(x["text"]) <= 100, prompt_short | llm | output_parser), # 短文本扩写
default_prompt | llm | output_parser # 默认情况
)

# 也可以与管道符结合
full_chain = {"text": RunnablePassthrough()} | branch_chain
result = full_chain.invoke({"text": "这是一个很长的文档内容..."})

判断值条件路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain_core.runnables import RunnableBranch

# 假设我们有一个分类器链,能判断用户意图是 'code', 'data' 还是 'general'
# classifier_chain = ... (一个输出意图分类的链)

# 定义针对不同意图的子链
code_chain = ... # 处理编程问题的链
data_chain = ... # 处理数据分析问题的链
general_chain = ... # 处理通用问题的链

# 使用 RunnableBranch 创建路由链
routing_chain = RunnableBranch(
(lambda x: x["intent"] == "code", code_chain),
(lambda x: x["intent"] == "data", data_chain),
general_chain, # 默认分支
)

# 最终链:先分类,再路由
# full_chain = classifier_chain | routing_chain

链嵌套使用路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langchain_core.runnables import RunnableBranch, RunnableLambda

# 定义路由函数
def route_by_topic(info):
topic = info.get("topic", "").lower()
if "code" in topic or "编程" in topic:
return "programming"
return "general"

# 创建不同分支的链
programming_chain = (
ChatPromptTemplate.from_template("作为编程专家,回答:{question}")
| ChatOpenAI()
| StrOutputParser()
)

general_chain = (
ChatPromptTemplate.from_template("作为通用助手,回答:{question}")
| ChatOpenAI()
| StrOutputParser()
)

# 使用 RunnableBranch 实现条件路由
branch = RunnableBranch(
(lambda x: route_by_topic(x) == "programming", programming_chain),
general_chain # 默认分支
)

# 完整链
full_chain = {
"topic": lambda x: x["topic"],
"question": lambda x: x["question"]
} | branch

# 执行
result = full_chain.invoke({
"topic": "编程问题",
"question": "Python列表推导式怎么用?"
})

3. 数据传递 (RunnablePassthrough)

RunnablePassthrough 就像一个“数据通道”,在复杂的字典流中,它可以原封不动地传递某个值,常与 RunnableParallel 配合使用。

1
2
3
4
5
6
7
8
9
10
11
12
from langchain_core.runnables import RunnablePassthrough

# 构建 RAG 链的典型用法:
# 1. 检索上下文 (retriever)
# 2. 透传原始问题 (RunnablePassthrough())
# 3. 将两者打包成一个字典,供 Prompt 使用
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)

4. 自定义逻辑 (RunnableLambda)

RunnableLambda 允许你将任意的 Python 函数包装成一个 Runnable 对象,无缝嵌入 LCEL 链中,实现复杂的数据转换或逻辑判断。

带检索的RAG链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 假设已有向量数据库
vectorstore = FAISS.load_local("faiss_index", OpenAIEmbeddings())
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 提示模板
template = """基于以下上下文回答问题:
{context}

问题:{question}
"""
prompt = ChatPromptTemplate.from_template(template)

# 模型和解析器
model = ChatOpenAI(model="gpt-4")
parser = StrOutputParser()

# 辅助函数:格式化检索结果
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

# 将其包装为 Runnable 后,就可以像其他组件一样使用
# chain = retriever | RunnableLambda(format_docs) | prompt | llm | parser
# 构建 RAG 链
rag_chain = (
{
"context": retriever | format_docs, # 检索并格式化文档
"question": RunnablePassthrough() # 直接传递用户问题
}
| prompt
| model
| parser
)

# 执行
result = rag_chain.invoke("什么是LCEL?")
print(result)

使用 RunnableLambda 把普通函数包装成 Runnable 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_core.runnables import RunnableLambda

# 定义一个普通函数
def extract_keywords(text: str) -> list:
# 这里是一个简单的模拟
keywords = ["模型", "人工智能", "学习"]
return [kw for kw in keywords if kw in text]

# 使用 RunnableLambda 将其转换为 Runnable
keyword_extractor = RunnableLambda(extract_keywords)

# 集成到链中:先生成答案,再提取关键词
complex_chain = prompt | model | StrOutputParser() | keyword_extractor

result = complex_chain.invoke({"topic": "深度学习"})
print(result) # 输出: ['模型', '学习']

5. 错误处理 (RunnableWithFallback)

LCEL链可以轻松添加重试和回退逻辑。RunnableWithFallback 可以为一个 Runnable 组件配置一个或多个“备胎”。当主组件执行失败时,会自动调用备用组件,提升系统的健壮性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_core.runnables import RunnableWithFallback

# 如果主模型(chat_model)调用失败,就使用备用模型(fallback_model)
robust_chain = RunnableWithFallback(
runnable=chat_prompt | chat_model,
fallbacks=[chat_prompt | fallback_model]
)

robust_chain = RunnableWithFallbacks(
primary_chain, # 第一个参数:主链(必须)
fallback_strategy=[ # 第二个参数:备用策略列表
backup_chain1, # 第1个备用:可能是 cheaper LLM
backup_chain2, # 第2个备用:可能是本地模型
lambda x: "服务暂时不可用" # 最终兜底:硬编码返回
]
)
1
2
from langchain_core.runnables import RunnableWithFallbacks
chain_with_fallback = chain.with_fallbacks([fallback_chain1, fallback_chain2])

场景:智能客服的降级策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from langchain_core.runnables import RunnableWithFallbacks, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_community.llms import Tongyi # 国产备用模型

# ========== 主链:GPT-4(高质量但可能不稳定) ==========
gpt4_chain = (
ChatPromptTemplate.from_template("作为专业客服,详细回答:{question}")
| ChatOpenAI(model="gpt-4", temperature=0.7)
)

# ========== 备用1:GPT-3.5(更快更稳定) ==========
gpt35_chain = (
ChatPromptTemplate.from_template("简要回答用户问题:{question}")
| ChatOpenAI(model="gpt-3.5-turbo", temperature=0.5)
)

# ========== 备用2:通义千问(国产兜底) ==========
tongyi_chain = (
ChatPromptTemplate.from_template("回答问题:{question}")
| Tongyi(model_name="qwen-turbo")
)

# ========== 最终兜底:硬编码回复 ==========
def final_fallback(input_dict):
"""当所有模型都失败时,返回友好提示"""
return {
"answer": "抱歉,智能客服暂时无法连接,请稍后再试或拨打人工热线:400-xxx-xxxx",
"status": "service_unavailable",
"original_question": input_dict.get("question", "")
}

# ========== 组装容错链 ==========
customer_service_chain = RunnableWithFallbacks(
gpt4_chain,
fallback_strategy=[
gpt35_chain,
tongyi_chain,
RunnableLambda(final_fallback) # 包装为 Runnable
]
)

# ========== 执行 ==========
try:
result = customer_service_chain.invoke({"question": "如何申请退款?"})
print(result)
except Exception as e:
# 理论上不会走到这里,因为最终兜底总会返回
print(f"完全失败:{e}")

示例3:LCEL构建Agent

在Agent开发中的核心应用:一个典型的ReAct模式Agent,其核心就是由LCEL构建的。

构建一个简易的数学计算Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
import math

# 1. 定义工具(工具本身也是Runnable)
def calculate_sqrt(input: str) -> str:
"""计算平方根。输入应为数字。"""
try:
return str(math.sqrt(float(input)))
except:
return "输入无效,请输入一个数字。"

sqrt_tool = Tool(
name="sqrt_calculator",
func=calculate_sqrt,
description="计算一个数字的平方根。输入应为单个数字。"
)

# 2. 使用LCEL风格创建Agent(现代方式)
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate

# 定义提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的数学助手。请使用合适的工具来回答问题。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"), # 这是关键!用于存放Agent的思考过程和工具调用记录
])

# 创建Agent
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
tools = [sqrt_tool]
agent = create_tool_calling_agent(llm, tools, prompt)

# 3. 创建执行器并运行
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
result = agent_executor.invoke({"input": "144的平方根是多少?"})
print(result["output"])

在这个Agent内部

  1. create_tool_calling_agent函数内部利用LCEL构建了一个复杂的链,该链能够:
    • 解析用户输入。
    • LLM思考是否需要调用工具(以及调用哪个)。
    • 如果调用工具,则将工具调用插入到 agent_scratchpad中。
    • 将工具结果返回给LLM进行下一步思考或生成最终答案。
  2. AgentExecutor负责循环执行这个链,直到LLM决定不再调用工具,并输出最终答案。

示例4:自定义的LCEL Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.runnables import RunnablePassthrough

# 假设我们已经定义好了:llm, tools, prompt
# 1. 定义一个“工具执行”的Runnable
def execute_tools(data):
"""根据LLM的输出,执行对应的工具。"""
action: AgentAction = data["action"]
tool_to_use = {t.name: t for t in tools}[action.tool]
observation = tool_to_use.invoke(action.tool_input)
return {"observation": observation, "intermediate_steps": data["intermediate_steps"] + [(action, observation)]}

# 2. 使用LCEL组装Agent的核心循环
agent_chain = (
RunnablePassthrough.assign(intermediate_steps=[]) # 初始化步骤记录
| prompt # 生成提示
| llm # LLM思考
| parse_llm_output # 解析LLM输出,判断是调用工具还是结束
| RunnableLambda(execute_tools) # 如果需要,执行工具
# 这里实际上需要一个循环,直到parse_llm_output返回AgentFinish
)
# 注:这是一个简化示意,完整的ReAct循环需要更复杂的控制流(通常用RunnableBranch实现)。

流式输出(Streaming)是LCEL的一等公民

1
2
3
4
5
6
7
8
9
10
for chunk in agent_executor.stream({"input": "你的问题"}):
if "actions" in chunk:
# 正在调用工具
print(f"调用工具: {chunk['actions'][0].tool}")
elif "output" in chunk:
# 最终输出
print(chunk['output'])
else:
# LLM思考的token流
print(chunk.get('messages', [{}])[-1].content, end='')

示例5:Agent+工具调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

from model.deepseek.config import DeepSeekConfig

# 1. 定义工具
@tool
def search_product(query: str) -> str:
"""搜索产品信息"""
return f"搜索结果:{query} 的相关产品..."

@tool
def calculate_price(quantity: int, unit_price: float) -> str:
"""计算总价"""
return f"总价:{quantity * unit_price} 元"

tools = [search_product, calculate_price]
llm = ChatOpenAI(
model=model,
api_key=api_key,
base_url=base_url,
temperature=0
)

# 2. 创建 Agent(LangChain 1.x 风格)
agent = create_agent(
model=llm,
tools=tools,
system_prompt="你是电商助手"
)

# 3. 执行(LCEL 风格)
result = agent.invoke({
"messages": [HumanMessage(content="买3件单价199元的T恤,总共多少钱?")]
})
print(result["messages"][-1].content)

LangChain:LCEL(表达式语言)使用示例

http://blog.gxitsky.com/2026/04/12/AI-LangChain-035-Chain-LCEL-Case/

作者

光星

发布于

2026-04-12

更新于

2026-04-12

许可协议

评论