LCEL (表达式语言)使用示例,包括基础使用,并行处理、条件分支、数据传递、错误处理、自定义逻辑。
自定义 Runnable 让能够将任何 Python 逻辑封装成符合 LCEL 标准的组件,从而无缝集成到 LangChain 管道中,享受流式、批处理、异步等特性。
LCE使用示例 示例1:基础使用 最经典的用法: prompt | llm | output_parser
翻译器链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_openai import ChatOpenAIprompt = ChatPromptTemplate.from_template( "请将以下内容翻译成{language}:{input_text}" ) llm = ChatOpenAI( model="deepseek-chat" , base_url="你的 Base URL" , api_key="你的 API Key" , ) output_parser = StrOutputParser() chain = prompt | llm | output_parser response = chain.invoke({ "language" : "英文" , "input_text" : "LangChain 是一个用于构建大语言模型应用的框架。" }) print (response)
一个简单的问答链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_template("你是一位技术专家。请用通俗易懂的语言解释一下:{topic}。" ) model = ChatOpenAI(model="gpt-4o" ) output_parser = StrOutputParser() chain = prompt | model | output_parser result = chain.invoke({"topic" : "什么是量子纠缠?" }) print (result)print ("流式输出:" , end="" , flush=True )for chunk in chain.stream({"topic" : "什么是递归?" }): print (chunk, end="" , flush=True ) print () results = chain.batch([{"topic" : "API" }, {"topic" : "微服务" }]) for i, res in enumerate (results): print (f"结果 {i} : {res} " )
简单的问答链 ,提示词指定角色:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_messages([ ("system" , "你是一位专业的{role}。" ), ("human" , "{question}" ) ]) model = ChatOpenAI(model="gpt-4" , temperature=0.7 ) parser = StrOutputParser() chain = prompt | model | parser response = chain.invoke({ "role" : "Python专家" , "question" : "解释Python中的装饰器" }) print (response)
关键解读 :
prompt | llm | output_parser创建了一个清晰的数据流 :输入字典-> 提示模板-> LLM模型-> 输出解析器-> 最终字符串。
每个组件都实现了 Runnable接口,知道如何接收上游输入,并产生下游能理解的输出。
示例2:复杂逻辑 LCEL 的强大之处在于它能轻松处理分支和并行等复杂逻辑,而无需编写复杂的 if/else 或循环。
1. 并行处理 (RunnableParallel) 当需要同时处理多个独立任务时(例如,从多个知识库检索信息),RunnableParallel 并发执行可以显著提升效率。
假设想同时获取一个城市的简介和最佳旅游时间 :
1 2 3 4 5 6 7 from langchain_core.runnables import RunnableParallelchain = RunnableParallel( intro=prompt_intro | llm | StrOutputParser(), best_time=prompt_time | llm | StrOutputParser() ) | final_prompt | llm | StrOutputParser()
假设想同时生成摘要和提取关键词 :
以下示例,LangChain 的 RunnableParallel 并行执行三个不同的任务:
**summary**:使用预先定义的 prompt 模板和 LLM 链生成文本摘要。
**keywords**:使用另一个提示模板提取文本关键词。
**length**:直接通过 lambda 函数计算输入文本的长度。
RunnableParallel 会并发 执行这些任务(而非顺序执行),然后将结果合并为一个字典,键分别为 summary、keywords、length。这样可以显著提高效率,尤其适合需要从同一输入派生多个独立结果的场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from langchain_core.runnables import RunnableParallel, RunnableLambdafrom langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_core.output_parsers import StrOutputParserfrom typing import Dict , Any summary_prompt = ChatPromptTemplate.from_template("请用一句话总结以下内容:\n{text}" ) keyword_prompt = ChatPromptTemplate.from_template("提取以下文本中的关键词(逗号分隔):\n{text}" ) llm = ChatOpenAI(model="gpt-4o-mini" , temperature=0 ) parser = StrOutputParser() summary_chain = summary_prompt | llm | parser keyword_chain = keyword_prompt | llm | parser def compute_length (inputs: Dict [str , Any ] ) -> int : """返回输入文本的字符长度""" return len (inputs["text" ]) length_chain = RunnableLambda(compute_length) parallel_chain = RunnableParallel( summary=summary_chain, keywords=keyword_chain, length=length_chain ) input_text = "LangChain 是一个开发框架,用于构建基于大语言模型的应用程序。" try : result = parallel_chain.invoke({"text" : input_text}) print ("摘要:" , result["summary" ]) print ("关键词:" , result["keywords" ]) print ("长度:" , result["length" ]) except Exception as e: print (f"执行失败: {e} " )
代码关键点说明
**RunnableParallel**:同时启动多个 Runnable,互不阻塞。
**RunnableLambda**:将普通 Python 函数转换为 LangChain 可运行的组件,确保类型一致。
异常捕获 :避免因 LLM API 错误导致整个程序崩溃。
明确分离组件定义与组合 :便于复用和测试。
多步骤处理(分析 → 翻译 → 优化)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain_core.runnables import RunnablePassthroughanalyze_prompt = ChatPromptTemplate.from_template( "分析以下文本的情感(积极/消极/中性),只返回标签:\n\n{text}" ) analyzer_chain = analyze_prompt | ChatOpenAI() | StrOutputParser() translate_prompt = ChatPromptTemplate.from_template( "用{sentiment}的风格翻译以下内容:\n\n{text}" ) full_chain = ( { "text" : RunnablePassthrough(), "sentiment" : analyzer_chain } | translate_prompt | ChatOpenAI() | StrOutputParser() ) result = full_chain.invoke("这个产品太棒了!" )
并行处理(Map-Reduce 模式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from langchain_core.runnables import RunnableParallelparallel_chain = RunnableParallel( summary=( ChatPromptTemplate.from_template("总结以下内容:{text}" ) | ChatOpenAI() | StrOutputParser() ), keywords=( ChatPromptTemplate.from_template("从以下内容提取关键词:{text}" ) | ChatOpenAI() | StrOutputParser() ), sentiment=( ChatPromptTemplate.from_template("分析以下内容的情感:{text}" ) | ChatOpenAI() | StrOutputParser() ) ) result = parallel_chain.invoke({ "text" : "OpenAI发布了GPT-5,性能大幅提升,令人兴奋!" })
2. 条件分支 (RunnableBranch) RunnableBranch 可以根据输入的条件,动态选择不同的处理路径,实现“if-else”逻辑。
根据问的是天气还是新闻路由 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from langchain_core.runnables import RunnableBranchweather_branch_chain = RunnableBranch( (lambda x: "天气" in x["question" ], weather_prompt | llm), (lambda x: "新闻" in x["question" ], news_prompt | llm), default_prompt | llm ) text_branch_chain = RunnableBranch( (lambda x: len (x["text" ]) < 20 , lambda x: f"短文本: {x['text' ]} " ), (lambda x: len (x["text" ]) >= 20 , lambda x: f"长文本: {x['text' ]} " ), lambda x: "未知" )
根据文档内容长度路由 :
1 2 3 4 5 6 7 8 9 10 11 12 from langchain_core.runnables import RunnableBranchbranch_chain = RunnableBranch( (lambda x: len (x["text" ]) > 100 , prompt_long | llm | output_parser), (lambda x: len (x["text" ]) <= 100 , prompt_short | llm | output_parser), default_prompt | llm | output_parser ) full_chain = {"text" : RunnablePassthrough()} | branch_chain result = full_chain.invoke({"text" : "这是一个很长的文档内容..." })
判断值条件路由 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from langchain_core.runnables import RunnableBranchcode_chain = ... data_chain = ... general_chain = ... routing_chain = RunnableBranch( (lambda x: x["intent" ] == "code" , code_chain), (lambda x: x["intent" ] == "data" , data_chain), general_chain, )
链嵌套使用路由 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from langchain_core.runnables import RunnableBranch, RunnableLambdadef route_by_topic (info ): topic = info.get("topic" , "" ).lower() if "code" in topic or "编程" in topic: return "programming" return "general" programming_chain = ( ChatPromptTemplate.from_template("作为编程专家,回答:{question}" ) | ChatOpenAI() | StrOutputParser() ) general_chain = ( ChatPromptTemplate.from_template("作为通用助手,回答:{question}" ) | ChatOpenAI() | StrOutputParser() ) branch = RunnableBranch( (lambda x: route_by_topic(x) == "programming" , programming_chain), general_chain ) full_chain = { "topic" : lambda x: x["topic" ], "question" : lambda x: x["question" ] } | branch result = full_chain.invoke({ "topic" : "编程问题" , "question" : "Python列表推导式怎么用?" })
3. 数据传递 (RunnablePassthrough) RunnablePassthrough 就像一个“数据通道”,在复杂的字典流中,它可以原封不动地传递某个值,常与 RunnableParallel 配合使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from langchain_core.runnables import Runnable, RunnableConfigfrom typing import Dict , Any class MyValidator (Runnable[Dict , Dict ] ): def invoke (self, input : Dict , config: RunnableConfig = None ) -> Dict : if "text" not in input : raise ValueError("缺少 'text' 字段" ) input ["validated" ] = True return input validator = MyValidator() chain = RunnablePassthrough() | validator | llm | parser
代码说明 :
MyValidator 是一个自定义的数据校验组件 ,确保输入字典包含 "text" 字段,并通过添加 "validated": True 标记来表明校验通过。
使用 LCEL 管道将恒等传递 、校验 、LLM 调用 和输出解析 串联成一个完整的处理链。
执行时,输入字典会依次流经各个组件,最终返回解析后的字符串。
RunnablePassthrough() 在这里的作用是保持链的输入类型一致(Dict),因为 validator 期望接收字典。如果直接 validator | llm,链的输入类型仍然是字典,但 RunnablePassthrough 通常用于需要“旁路”原始数据到后续步骤的场景。
示例代码自定义 Runnable 类没有实现异步版本 ainvoke,因此只能用于同步链。如需异步支持,应同时实现 async def ainvoke(...)。
4. 自定义逻辑 (RunnableLambda) RunnableLambda 允许你将任意的 Python 函数包装成一个 Runnable 对象,无缝嵌入 LCEL 链中,实现复杂的数据转换或逻辑判断。
带检索的RAG链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from langchain_community.vectorstores import FAISSfrom langchain_openai import OpenAIEmbeddings, ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import RunnablePassthroughvectorstore = FAISS.load_local("faiss_index" , OpenAIEmbeddings()) retriever = vectorstore.as_retriever(search_kwargs={"k" : 3 }) template = """基于以下上下文回答问题: {context} 问题:{question} """ prompt = ChatPromptTemplate.from_template(template) model = ChatOpenAI(model="gpt-4" ) parser = StrOutputParser() def format_docs (docs ): return "\n\n" .join(doc.page_content for doc in docs) rag_chain = ( { "context" : retriever | format_docs, "question" : RunnablePassthrough() } | prompt | model | parser ) result = rag_chain.invoke("什么是LCEL?" ) print (result)
使用 RunnableLambda 把普通函数包装成 Runnable 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langchain_core.runnables import RunnableLambdadef extract_keywords (text: str ) -> list : keywords = ["模型" , "人工智能" , "学习" ] return [kw for kw in keywords if kw in text] keyword_extractor = RunnableLambda(extract_keywords) complex_chain = prompt | model | StrOutputParser() | keyword_extractor result = complex_chain.invoke({"topic" : "深度学习" }) print (result)
5. 错误处理 (RunnableWithFallback) LCEL链可以轻松添加重试和回退逻辑。RunnableWithFallback 可以为一个 Runnable 组件配置一个或多个“备胎”。当主组件执行失败时,会自动调用备用组件,提升系统的健壮性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langchain_core.runnables import RunnableWithFallbackrobust_chain = RunnableWithFallback( runnable=chat_prompt | chat_model, fallbacks=[chat_prompt | fallback_model] ) robust_chain = RunnableWithFallbacks( primary_chain, fallback_strategy=[ backup_chain1, backup_chain2, lambda x: "服务暂时不可用" ] )
1 2 from langchain_core.runnables import RunnableWithFallbackschain_with_fallback = chain.with_fallbacks([fallback_chain1, fallback_chain2])
场景:智能客服的降级策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from langchain_core.runnables import RunnableWithFallbacks, RunnableLambdafrom langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_community.llms import Tongyi gpt4_chain = ( ChatPromptTemplate.from_template("作为专业客服,详细回答:{question}" ) | ChatOpenAI(model="gpt-4" , temperature=0.7 ) ) gpt35_chain = ( ChatPromptTemplate.from_template("简要回答用户问题:{question}" ) | ChatOpenAI(model="gpt-3.5-turbo" , temperature=0.5 ) ) tongyi_chain = ( ChatPromptTemplate.from_template("回答问题:{question}" ) | Tongyi(model_name="qwen-turbo" ) ) def final_fallback (input_dict ): """当所有模型都失败时,返回友好提示""" return { "answer" : "抱歉,智能客服暂时无法连接,请稍后再试或拨打人工热线:400-xxx-xxxx" , "status" : "service_unavailable" , "original_question" : input_dict.get("question" , "" ) } customer_service_chain = RunnableWithFallbacks( gpt4_chain, fallback_strategy=[ gpt35_chain, tongyi_chain, RunnableLambda(final_fallback) ] ) try : result = customer_service_chain.invoke({"question" : "如何申请退款?" }) print (result) except Exception as e: print (f"完全失败:{e} " )
示例3:LCEL构建Agent 在Agent开发中的核心应用:一个典型的ReAct模式Agent ,其核心就是由LCEL构建的。
构建一个简易的数学计算Agent :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from langchain.agents import create_react_agent, AgentExecutorfrom langchain.tools import Toolfrom langchain_core.prompts import PromptTemplatefrom langchain_openai import ChatOpenAIimport mathdef calculate_sqrt (input : str ) -> str : """计算平方根。输入应为数字。""" try : return str (math.sqrt(float (input ))) except : return "输入无效,请输入一个数字。" sqrt_tool = Tool( name="sqrt_calculator" , func=calculate_sqrt, description="计算一个数字的平方根。输入应为单个数字。" ) from langchain.agents import AgentExecutor, create_tool_calling_agentfrom langchain_core.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_messages([ ("system" , "你是一个专业的数学助手。请使用合适的工具来回答问题。" ), ("human" , "{input}" ), ("placeholder" , "{agent_scratchpad}" ), ]) llm = ChatOpenAI(model="gpt-3.5-turbo" , temperature=0 ) tools = [sqrt_tool] agent = create_tool_calling_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True ) result = agent_executor.invoke({"input" : "144的平方根是多少?" }) print (result["output" ])
在这个Agent内部 :
create_tool_calling_agent函数内部利用LCEL构建了一个复杂的链,该链能够:
解析用户输入。
LLM思考是否需要调用工具(以及调用哪个)。
如果调用工具,则将工具调用插入到 agent_scratchpad中。
将工具结果返回给LLM进行下一步思考或生成最终答案。
AgentExecutor负责循环执行这个链,直到LLM决定不再调用工具,并输出最终答案。
示例4:自定义的LCEL Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain_core.agents import AgentAction, AgentFinishfrom langchain_core.runnables import RunnablePassthroughdef execute_tools (data ): """根据LLM的输出,执行对应的工具。""" action: AgentAction = data["action" ] tool_to_use = {t.name: t for t in tools}[action.tool] observation = tool_to_use.invoke(action.tool_input) return {"observation" : observation, "intermediate_steps" : data["intermediate_steps" ] + [(action, observation)]} agent_chain = ( RunnablePassthrough.assign(intermediate_steps=[]) | prompt | llm | parse_llm_output | RunnableLambda(execute_tools) )
流式输出(Streaming)是LCEL的一等公民 :
1 2 3 4 5 6 7 8 9 10 for chunk in agent_executor.stream({"input" : "你的问题" }): if "actions" in chunk: print (f"调用工具: {chunk['actions' ][0 ].tool} " ) elif "output" in chunk: print (chunk['output' ]) else : print (chunk.get('messages' , [{}])[-1 ].content, end='' )
示例5:Agent+工具调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from langchain.agents import create_agentfrom langchain_core.messages import HumanMessagefrom langchain_core.tools import toolfrom langchain_openai import ChatOpenAIfrom model.deepseek.config import DeepSeekConfig@tool def search_product (query: str ) -> str : """搜索产品信息""" return f"搜索结果:{query} 的相关产品..." @tool def calculate_price (quantity: int , unit_price: float ) -> str : """计算总价""" return f"总价:{quantity * unit_price} 元" tools = [search_product, calculate_price] llm = ChatOpenAI( model=model, api_key=api_key, base_url=base_url, temperature=0 ) agent = create_agent( model=llm, tools=tools, system_prompt="你是电商助手" ) result = agent.invoke({ "messages" : [HumanMessage(content="买3件单价199元的T恤,总共多少钱?" )] }) print (result["messages" ][-1 ].content)
示例6:RAG(检索增强生成)链 LangChain 中构建 RAG(检索增强生成)链 的核心思路是:
并行获取数据 :同时执行两个任务——检索相关文档并格式化,以及透传用户原始问题。
合并为 Prompt 输入 :将检索到的上下文和用户问题组合成一个字典,传递给提示模板。
生成回答 :通过 LLM 和输出解析器得到最终答案。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from langchain_core.runnables import RunnablePassthrough, RunnableParallelfrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIparallel_branch = { "context" : retriever | format_docs, "question" : RunnablePassthrough() } prompt_filled = parallel_branch | prompt rag_chain = prompt_filled | llm | parser rag_chain = ( { "context" : retriever | format_docs, "question" : RunnablePassthrough() } | prompt | llm | parser )
代码解析 :
组件
作用
RunnablePassthrough()
原样返回输入,不做任何修改。 这里用于保留用户原始问题,避免在并行分支中被覆盖。
{"context": ..., "question": ...}
RunnableParallel 的简写。同时执行两个子链,输出合并为字典。
`retriever
format_docs`
prompt
提示模板,需要 context 和 question 两个变量。
`llm
parser`
流程示意图 :
flowchart TD
A["用户输入 (例如:什么是LangChain?)"] --> B["RunnableParallel 并行执行两个分支"]
B --> C["分支1: context retriever | format_docs"]
B --> D["分支2: question RunnablePassthrough()"]
C --> E["检索并格式化文档 输出:上下文字符串"]
D --> F["原样透传问题 输出:用户原始问题"]
E --> G["合并为字典 { context: ..., question: ... }"]
F --> G
G --> H["提示模板(Prompt) 填充 context 和 question"]
H --> I["LLM 生成答案"]
I --> J["输出解析器(StrOutputParser)"]
J --> K["最终回答"]
自定义Runnable类 自定义 Runnable 让能够将任何 Python 逻辑封装成符合 LCEL 标准的组件,从而无缝集成到 LangChain 管道中,享受流式、批处理、异步等特性。关键在于正确实现 invoke(同步)和可选地实现 ainvoke、stream、batch,并遵循输入输出类型约定。
一个自定义 Runnable 类的典型示例,实现文本预处理 功能:去除多余空格、统一小写、移除特殊字符。该类可以无缝集成到 LCEL 管道中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 from typing import Iterator, Optional , Dict , Any from langchain_core.runnables import Runnable, RunnableConfigimport reclass TextPreprocessor (Runnable[str , str ] ): """ 自定义 Runnable:文本预处理 输入:原始字符串 输出:清洗后的字符串(去除多余空格、转小写、移除标点符号) """ def __init__ (self, to_lower: bool = True , remove_punctuation: bool = True ): """ 初始化预处理参数 :param to_lower: 是否转换为小写 :param remove_punctuation: 是否移除标点符号(保留字母、数字、空格) """ self.to_lower = to_lower self.remove_punctuation = remove_punctuation def invoke (self, input : str , config: Optional [RunnableConfig] = None ) -> str : """同步处理方法""" text = re.sub(r'\s+' , ' ' , input .strip()) if self.remove_punctuation: text = re.sub(r'[^\w\s]' , '' , text) if self.to_lower: text = text.lower() return text async def ainvoke (self, input : str , config: Optional [RunnableConfig] = None ) -> str : """异步处理方法(直接复用同步逻辑,实际场景可加异步IO)""" return self.invoke(input , config) def stream (self, input : str , config: Optional [RunnableConfig] = None ) -> Iterator[str ]: """逐词输出处理结果,模拟流式效果""" processed = self.invoke(input , config) for word in processed.split(): yield word + " " def batch (self, inputs: list , config: Optional [RunnableConfig] = None ) -> list : """批量处理多个字符串(无需重写,默认已支持,此处仅为演示)""" return [self.invoke(inp, config) for inp in inputs]
单独使用 1 2 3 preprocessor = TextPreprocessor(to_lower=True , remove_punctuation=True ) cleaned = preprocessor.invoke(" Hello, World! This is LangChain. " ) print (cleaned)
集成到 LCEL 管道 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_template("请用一句话回答:{question}" ) llm = ChatOpenAI(model="gpt-3.5-turbo" ) parser = StrOutputParser() chain = TextPreprocessor() | prompt | llm | parser raw_input = " Tell me, what's the weather like today? " result = chain.invoke({"question" : raw_input})
修正集成方式 :需要从字典中提取 question 字段传递给 TextPreprocessor。可以使用 RunnablePassthrough 或自定义映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain_core.runnables import RunnableLambda, RunnableParallelchain = ( RunnableLambda(lambda d: d["question" ]) | TextPreprocessor() | (lambda clean: {"question" : clean}) | prompt | llm | parser ) chain = ( {"processed" : RunnableLambda(lambda d: d["question" ]) | TextPreprocessor()} | (lambda d: {"question" : d["processed" ]}) | prompt | llm | parser ) result = chain.invoke({"question" : " Hello! How are you? " }) print (result)
自定义 Runnable 的关键点
要素
说明
继承 Runnable[InputType, OutputType]
泛型指定输入输出类型,增强类型安全
实现 invoke
同步调用核心逻辑,必须实现
**实现 ainvoke**(可选)
支持异步,如不实现则默认调用 invoke,但建议至少实现占位
**实现 stream**(可选)
支持流式输出,用于逐词/逐块返回
**实现 batch**(可选)
默认基类会循环调用 invoke,可重写优化(如并行请求)
使用 config 参数
接收 RunnableConfig,可读取回调、标签等运行时配置
场景示例 场景1:敏感信息脱敏 1 2 3 4 class MaskPII (Runnable[str , str ] ): def invoke (self, input : str , config=None ) -> str : return re.sub(r'\b\d{11}\b' , '***手机号***' , input )
场景2:调用外部 API 进行翻译 1 2 3 4 5 6 class Translate (Runnable[str , str ] ): def __init__ (self, target_lang="zh" ): self.target_lang = target_lang def invoke (self, input : str , config=None ) -> str : return translated_text
场景3:缓存查询结果 1 2 3 4 5 6 7 class Cache (Runnable[str , str ] ): def __init__ (self ): self.cache = {} def invoke (self, input : str , config=None ) -> str : if input not in self.cache: self.cache[input ] = expensive_lookup(input ) return self.cache[input ]