LCEL 表达式语方使用示例,包括基础使用,并行处理、条件分支、数据传递、错误处理、自定义逻辑。
LCE使用示例 示例1:基础使用 最经典的用法: prompt | llm | output_parser
翻译器链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_openai import ChatOpenAIprompt = ChatPromptTemplate.from_template( "请将以下内容翻译成{language}:{input_text}" ) llm = ChatOpenAI( model="deepseek-chat" , base_url="你的 Base URL" , api_key="你的 API Key" , ) output_parser = StrOutputParser() chain = prompt | llm | output_parser response = chain.invoke({ "language" : "英文" , "input_text" : "LangChain 是一个用于构建大语言模型应用的框架。" }) print (response)
一个简单的问答链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_template("你是一位技术专家。请用通俗易懂的语言解释一下:{topic}。" ) model = ChatOpenAI(model="gpt-4o" ) output_parser = StrOutputParser() chain = prompt | model | output_parser result = chain.invoke({"topic" : "什么是量子纠缠?" }) print (result)print ("流式输出:" , end="" , flush=True )for chunk in chain.stream({"topic" : "什么是递归?" }): print (chunk, end="" , flush=True ) print () results = chain.batch([{"topic" : "API" }, {"topic" : "微服务" }]) for i, res in enumerate (results): print (f"结果 {i} : {res} " )
简单的问答链 ,提示词指定角色:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_messages([ ("system" , "你是一位专业的{role}。" ), ("human" , "{question}" ) ]) model = ChatOpenAI(model="gpt-4" , temperature=0.7 ) parser = StrOutputParser() chain = prompt | model | parser response = chain.invoke({ "role" : "Python专家" , "question" : "解释Python中的装饰器" }) print (response)
关键解读 :
prompt | llm | output_parser创建了一个清晰的数据流 :输入字典-> 提示模板-> LLM模型-> 输出解析器-> 最终字符串。
每个组件都实现了 Runnable接口,知道如何接收上游输入,并产生下游能理解的输出。
示例2:复杂逻辑 LCEL 的强大之处在于它能轻松处理分支和并行等复杂逻辑,而无需编写复杂的 if/else 或循环。
1. 并行处理 (RunnableParallel) 当需要同时处理多个独立任务时(例如,从多个知识库检索信息),RunnableParallel 并发执行可以显著提升效率。
假设想同时获取一个城市的简介和最佳旅游时间 :
1 2 3 4 5 6 7 from langchain_core.runnables import RunnableParallelchain = RunnableParallel( intro=prompt_intro | llm | StrOutputParser(), best_time=prompt_time | llm | StrOutputParser() ) | final_prompt | llm | StrOutputParser()
假设想同时生成摘要和提取关键词 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langchain_core.runnables import RunnableParallelsummary_chain = ... keywords_chain = ... parallel_chain = RunnableParallel( summary=summary_chain, keywords=keywords_chain, ) result = parallel_chain.invoke({"input" : "一篇关于AI的长文章" }) print (result['summary' ])print (result['keywords' ])
多步骤处理(分析 → 翻译 → 优化)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain_core.runnables import RunnablePassthroughanalyze_prompt = ChatPromptTemplate.from_template( "分析以下文本的情感(积极/消极/中性),只返回标签:\n\n{text}" ) analyzer_chain = analyze_prompt | ChatOpenAI() | StrOutputParser() translate_prompt = ChatPromptTemplate.from_template( "用{sentiment}的风格翻译以下内容:\n\n{text}" ) full_chain = ( { "text" : RunnablePassthrough(), "sentiment" : analyzer_chain } | translate_prompt | ChatOpenAI() | StrOutputParser() ) result = full_chain.invoke("这个产品太棒了!" )
并行处理(Map-Reduce 模式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from langchain_core.runnables import RunnableParallelparallel_chain = RunnableParallel( summary=( ChatPromptTemplate.from_template("总结以下内容:{text}" ) | ChatOpenAI() | StrOutputParser() ), keywords=( ChatPromptTemplate.from_template("从以下内容提取关键词:{text}" ) | ChatOpenAI() | StrOutputParser() ), sentiment=( ChatPromptTemplate.from_template("分析以下内容的情感:{text}" ) | ChatOpenAI() | StrOutputParser() ) ) result = parallel_chain.invoke({ "text" : "OpenAI发布了GPT-5,性能大幅提升,令人兴奋!" })
2. 条件分支 (RunnableBranch) RunnableBranch 可以根据输入的条件,动态选择不同的处理路径,实现“if-else”逻辑。
根据问的是天气还是新闻路由 :
1 2 3 4 5 6 7 8 from langchain_core.runnables import RunnableBranchbranch_chain = RunnableBranch( (lambda x: "天气" in x["question" ], weather_prompt | llm), (lambda x: "新闻" in x["question" ], news_prompt | llm), default_prompt | llm )
根据文档内容长度路由 :
1 2 3 4 5 6 7 8 9 10 11 12 from langchain_core.runnables import RunnableBranchbranch_chain = RunnableBranch( (lambda x: len (x["text" ]) > 100 , prompt_long | llm | output_parser), (lambda x: len (x["text" ]) <= 100 , prompt_short | llm | output_parser), default_prompt | llm | output_parser ) full_chain = {"text" : RunnablePassthrough()} | branch_chain result = full_chain.invoke({"text" : "这是一个很长的文档内容..." })
判断值条件路由 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from langchain_core.runnables import RunnableBranchcode_chain = ... data_chain = ... general_chain = ... routing_chain = RunnableBranch( (lambda x: x["intent" ] == "code" , code_chain), (lambda x: x["intent" ] == "data" , data_chain), general_chain, )
链嵌套使用路由 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from langchain_core.runnables import RunnableBranch, RunnableLambdadef route_by_topic (info ): topic = info.get("topic" , "" ).lower() if "code" in topic or "编程" in topic: return "programming" return "general" programming_chain = ( ChatPromptTemplate.from_template("作为编程专家,回答:{question}" ) | ChatOpenAI() | StrOutputParser() ) general_chain = ( ChatPromptTemplate.from_template("作为通用助手,回答:{question}" ) | ChatOpenAI() | StrOutputParser() ) branch = RunnableBranch( (lambda x: route_by_topic(x) == "programming" , programming_chain), general_chain ) full_chain = { "topic" : lambda x: x["topic" ], "question" : lambda x: x["question" ] } | branch result = full_chain.invoke({ "topic" : "编程问题" , "question" : "Python列表推导式怎么用?" })
3. 数据传递 (RunnablePassthrough) RunnablePassthrough 就像一个“数据通道”,在复杂的字典流中,它可以原封不动地传递某个值,常与 RunnableParallel 配合使用。
1 2 3 4 5 6 7 8 9 10 11 12 from langchain_core.runnables import RunnablePassthroughrag_chain = ( {"context" : retriever | format_docs, "question" : RunnablePassthrough()} | prompt | llm | StrOutputParser() )
4. 自定义逻辑 (RunnableLambda) RunnableLambda 允许你将任意的 Python 函数包装成一个 Runnable 对象,无缝嵌入 LCEL 链中,实现复杂的数据转换或逻辑判断。
带检索的RAG链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from langchain_community.vectorstores import FAISSfrom langchain_openai import OpenAIEmbeddings, ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import RunnablePassthroughvectorstore = FAISS.load_local("faiss_index" , OpenAIEmbeddings()) retriever = vectorstore.as_retriever(search_kwargs={"k" : 3 }) template = """基于以下上下文回答问题: {context} 问题:{question} """ prompt = ChatPromptTemplate.from_template(template) model = ChatOpenAI(model="gpt-4" ) parser = StrOutputParser() def format_docs (docs ): return "\n\n" .join(doc.page_content for doc in docs) rag_chain = ( { "context" : retriever | format_docs, "question" : RunnablePassthrough() } | prompt | model | parser ) result = rag_chain.invoke("什么是LCEL?" ) print (result)
使用 RunnableLambda 把普通函数包装成 Runnable 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langchain_core.runnables import RunnableLambdadef extract_keywords (text: str ) -> list : keywords = ["模型" , "人工智能" , "学习" ] return [kw for kw in keywords if kw in text] keyword_extractor = RunnableLambda(extract_keywords) complex_chain = prompt | model | StrOutputParser() | keyword_extractor result = complex_chain.invoke({"topic" : "深度学习" }) print (result)
5. 错误处理 (RunnableWithFallback) LCEL链可以轻松添加重试和回退逻辑。RunnableWithFallback 可以为一个 Runnable 组件配置一个或多个“备胎”。当主组件执行失败时,会自动调用备用组件,提升系统的健壮性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langchain_core.runnables import RunnableWithFallbackrobust_chain = RunnableWithFallback( runnable=chat_prompt | chat_model, fallbacks=[chat_prompt | fallback_model] ) robust_chain = RunnableWithFallbacks( primary_chain, fallback_strategy=[ backup_chain1, backup_chain2, lambda x: "服务暂时不可用" ] )
1 2 from langchain_core.runnables import RunnableWithFallbackschain_with_fallback = chain.with_fallbacks([fallback_chain1, fallback_chain2])
场景:智能客服的降级策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from langchain_core.runnables import RunnableWithFallbacks, RunnableLambdafrom langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_community.llms import Tongyi gpt4_chain = ( ChatPromptTemplate.from_template("作为专业客服,详细回答:{question}" ) | ChatOpenAI(model="gpt-4" , temperature=0.7 ) ) gpt35_chain = ( ChatPromptTemplate.from_template("简要回答用户问题:{question}" ) | ChatOpenAI(model="gpt-3.5-turbo" , temperature=0.5 ) ) tongyi_chain = ( ChatPromptTemplate.from_template("回答问题:{question}" ) | Tongyi(model_name="qwen-turbo" ) ) def final_fallback (input_dict ): """当所有模型都失败时,返回友好提示""" return { "answer" : "抱歉,智能客服暂时无法连接,请稍后再试或拨打人工热线:400-xxx-xxxx" , "status" : "service_unavailable" , "original_question" : input_dict.get("question" , "" ) } customer_service_chain = RunnableWithFallbacks( gpt4_chain, fallback_strategy=[ gpt35_chain, tongyi_chain, RunnableLambda(final_fallback) ] ) try : result = customer_service_chain.invoke({"question" : "如何申请退款?" }) print (result) except Exception as e: print (f"完全失败:{e} " )
示例3:LCEL构建Agent 在Agent开发中的核心应用:一个典型的ReAct模式Agent ,其核心就是由LCEL构建的。
构建一个简易的数学计算Agent :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from langchain.agents import create_react_agent, AgentExecutorfrom langchain.tools import Toolfrom langchain_core.prompts import PromptTemplatefrom langchain_openai import ChatOpenAIimport mathdef calculate_sqrt (input : str ) -> str : """计算平方根。输入应为数字。""" try : return str (math.sqrt(float (input ))) except : return "输入无效,请输入一个数字。" sqrt_tool = Tool( name="sqrt_calculator" , func=calculate_sqrt, description="计算一个数字的平方根。输入应为单个数字。" ) from langchain.agents import AgentExecutor, create_tool_calling_agentfrom langchain_core.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_messages([ ("system" , "你是一个专业的数学助手。请使用合适的工具来回答问题。" ), ("human" , "{input}" ), ("placeholder" , "{agent_scratchpad}" ), ]) llm = ChatOpenAI(model="gpt-3.5-turbo" , temperature=0 ) tools = [sqrt_tool] agent = create_tool_calling_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True ) result = agent_executor.invoke({"input" : "144的平方根是多少?" }) print (result["output" ])
在这个Agent内部 :
create_tool_calling_agent函数内部利用LCEL构建了一个复杂的链,该链能够:
解析用户输入。
LLM思考是否需要调用工具(以及调用哪个)。
如果调用工具,则将工具调用插入到 agent_scratchpad中。
将工具结果返回给LLM进行下一步思考或生成最终答案。
AgentExecutor负责循环执行这个链,直到LLM决定不再调用工具,并输出最终答案。
示例4:自定义的LCEL Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain_core.agents import AgentAction, AgentFinishfrom langchain_core.runnables import RunnablePassthroughdef execute_tools (data ): """根据LLM的输出,执行对应的工具。""" action: AgentAction = data["action" ] tool_to_use = {t.name: t for t in tools}[action.tool] observation = tool_to_use.invoke(action.tool_input) return {"observation" : observation, "intermediate_steps" : data["intermediate_steps" ] + [(action, observation)]} agent_chain = ( RunnablePassthrough.assign(intermediate_steps=[]) | prompt | llm | parse_llm_output | RunnableLambda(execute_tools) )
流式输出(Streaming)是LCEL的一等公民 :
1 2 3 4 5 6 7 8 9 10 for chunk in agent_executor.stream({"input" : "你的问题" }): if "actions" in chunk: print (f"调用工具: {chunk['actions' ][0 ].tool} " ) elif "output" in chunk: print (chunk['output' ]) else : print (chunk.get('messages' , [{}])[-1 ].content, end='' )
示例5:Agent+工具调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from langchain.agents import create_agentfrom langchain_core.messages import HumanMessagefrom langchain_core.tools import toolfrom langchain_openai import ChatOpenAIfrom model.deepseek.config import DeepSeekConfig@tool def search_product (query: str ) -> str : """搜索产品信息""" return f"搜索结果:{query} 的相关产品..." @tool def calculate_price (quantity: int , unit_price: float ) -> str : """计算总价""" return f"总价:{quantity * unit_price} 元" tools = [search_product, calculate_price] llm = ChatOpenAI( model=model, api_key=api_key, base_url=base_url, temperature=0 ) agent = create_agent( model=llm, tools=tools, system_prompt="你是电商助手" ) result = agent.invoke({ "messages" : [HumanMessage(content="买3件单价199元的T恤,总共多少钱?" )] }) print (result["messages" ][-1 ].content)