LangChain:流式输出行为的回退方案

回退通常意味着用一个全新的响应来替换失败的响应,而这在流式传输中是做不到的。

在 LangChain 的流式输出场景中,常见的回退方案(如 with_fallbacks)在并不直接适用,核心难点在于,一旦流式传输开始,就无法简单地“撤回”已经发送给客户端的数据。

在LangChain的流式输出场景中,错误处理的核心规则是:回退只在流创建的“初始阶段”发生,一旦流开始输出数据,中途发生的错误将不会触发回退。

流式输出的回退分析

回退阶段

首先要明确的一点,无论是 RunnableWithFallbacks 还是 ModelFallbackMiddleware,在流式输出时,回退只在流创建的初始阶段生效。一旦第一个 token 已经输出给用户,过程中发生的错误将不会触发回退切换到备选模型。

执行阶段 描述 是否触发回退
阶段一:流创建阶段 (Stream Creation) 调用 model.stream() 方法,框架正在尝试建立与模型的连接,准备开始生成内容。 。如果在此阶段连接失败(如网络错误、鉴权失败),回退机制会被激活,并按预设顺序尝试其他备选模型。
阶段二:流传输阶段 (Streaming) 流已成功创建,模型正在逐词(或逐块)地返回数据。 。如果在传输过程中发生错误(如中断、解析错误),框架会将其视为已发生的“运行时错误”,不会再去尝试其他备选模型。

在LangChain的流式输出中,回退是一个“先到先得”的保护机制,它只在还未看到任何输出时生效。一旦内容开始输出,系统的稳定性就主要依赖主链路的可靠性了。

因此,设计健壮的流式应用时,应前置错误处理(如网络预检),并为可能的中断设计应用层的降级方案

容错策略

LangChain 引入了 ModelAbortError 类来优化中断处理,允许回退链在模型调用被中止时正确继续执行,特别是在流式操作中,可以确保部分输出被正确处理。

由于流式输出的回退触发条件非常严格,在设计系统时,可能需要组合使用多种容错策略。

策略 描述 使用建议
关闭内置重试 LLM包装器默认有重试逻辑。在使用回退前,应将其关闭(如max_retries=0),
否则主模型会不断重试,延迟甚至阻止回退的触发。
必须操作
区分异常类型 with_fallbacks 方法允许通过 exceptions_to_handle 参数
指定触发回退的异常类型(如 RateLimitError),
对其他意外错误直接抛出,避免错误被误处理。
推荐使用
组合使用:回退 + 重试 对偶发的网络波动等可恢复错误,可以先用 with_retry 设置重试;
对无法恢复的错误(如服务下线),再用 with_fallbacks 实现模型切换。
高级实践
应用层处理 对流传输阶段的错误,可以在应用层(如UI)提供缓存机制
或“重试/降级”按钮,进行兜底。
兜底方案

此外,需要特别注意的是,对于 LangChain 1.2+ 引入的 ModelFallbackMiddleware,根据其官方文档定义,它是一个“在模型调用失败时尝试备选模型”的中间件,这个机制本身并不专门针对流式场景提供特殊的、独立于 RunnableWithFallbacks 核心规则的处理方式。因此,对于该中间件,同样适用上述的回退触发规则。

流式创建阶段的降级

这是一种主动防御的策略。核心思想是:在真正开始流式输出之前,先尝试调用流式接口。如果调用失败(例如网络错误、服务不可用),则立即降级为非流式调用,并将完整的响应返回。

这种方法确保了客户端要么收到一个完整的流式响应,要么收到一个完整的非流式响应,避免了流式传输中途失败导致的尴尬局面。

with_fallbacks回退

LangChain中最核心的高可用机制,当主执行路径失败时能自动切换到备用模型或流程。

基本用法:

1
2
3
4
5
6
# 主力模型链
primary_chain = prompt | gpt4_model | parser
# 备用模型链
fallback_chain = prompt | gpt35_model | parser
# 构建带回退的生产级链
robust_chain = primary_chain.with_fallbacks([fallback_chain])

捕获异常降级示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langchain_core.runnables import RunnableLambda

def resilient_stream(llm, input: str, **kwargs):
"""
尝试进行流式调用,如果失败则降级为普通调用。
"""
try:
chunks = []
# 尝试使用 stream() 方法进行流式生成
for chunk in llm.stream(input, **kwargs):
if isinstance(chunk, str):
chunks.append(chunk)
elif hasattr(chunk, "content"):
chunks.append(chunk.content)
# 在实际应用中,这里应该将 chunk 实时发送给客户端
# yield chunk
return "".join(chunks)
# 捕获流式调用可能出现的异常,如网络读取错误、运行时错误等
except (Exception) as e:
# 降级策略:当流式调用失败时,使用 invoke() 方法进行非流式调用
# 必须传入完全相同的 **kwargs,以保证生成参数(如 temperature)一致
return llm.invoke(input, **kwargs)

# 使用 RunnableLambda 将其包装成一个可重用的组件
resilient_chain = RunnableLambda(
lambda x: resilient_stream(llm, x["input"], temperature=0.3, max_tokens=256)
)

# 调用 resilient_chain.invoke(...) 即可

关键点:

  • 异常捕获:需要捕获流式调用中可能出现的特定异常,如 httpx.ReadErrorRuntimeError 等。
  • 参数一致性:降级调用 invoke() 时,必须复用流式调用时的所有参数(**kwargs),否则可能导致生成结果不一致。
  • 客户端体验:客户端需要能够同时处理流式和非流式两种响应格式。

流式输出阶段的重试

流式传输已经开始,在中途发生错误,它不能“回退”到另一个模型,但可以让当前模型尝试自我修正,继续完成输出。

ModelRetryMiddleware重试中间件

对于流式输出甚至是恢复的中间步骤,最有效的方案是在错误发生时进行重试,而非切换模型。ModelRetryMiddleware 正是为此设计的,它会在模型调用失败时,根据配置的退避策略自动重试

方案一:使用 ModelRetryMiddleware 进行智能重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain.agents import create_agent
from langchain.agents.middleware import ModelRetryMiddleware
from langchain_openai import ChatOpenAI

# 配置重试中间件
retry_middleware = ModelRetryMiddleware(
max_retries=3,
backoff_factor=2.0, # 退避因子:1s, 2s, 4s
initial_delay=1.0, # 初始等待1秒
max_delay=60.0, # 最大等待60秒
jitter=True, # 添加随机抖动,避免"惊群效应"
retry_on=(TimeoutError, ConnectionError), # 仅在这些错误时重试
on_failure="continue", # 重试失败后返回包含错误信息的消息并继续
)

agent = create_agent(
model="openai:gpt-4o",
middleware=[retry_middleware],
)

详解:退避因子 backoff_factor 控制着延迟增长速度,例如 initial_delay * (backoff_factor ^ retry_number)jitter 为延迟添加 ±25% 的随机抖动,能有效避免大量请求同时在失败后涌入,适合高并发场景。on_failure 参数可选 continue(让 Agent 带着错误信息继续)或 error(直接抛出异常)两种行为。retry_on 让你可以精准控制需要重试的错误类型,避免在不该重试的场景下浪费时间。

方案二:ModelFallbackMiddleware 配合重试中间件使用

虽然回退在流式开始后不生效,但仍可用于构建“连接的瞬间”的高可用性。官方推荐的最佳实践是组合使用重试和回退中间件,在 Agent 层面形成一套纵深防御策略:

  1. 先用 ModelRetryMiddleware 处理临时性错误(如网络抖动、限流 429)。
  2. 再用 ModelFallbackMiddleware 应对主模型彻底不可用(如服务下线、配额耗尽)的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain.agents import create_agent
from langchain.agents.middleware import ModelRetryMiddleware
from langchain_anthropic import ChatAnthropic
from langchain_ollama import ChatOllama

# ModelFallbackMiddleware 需从官方参考文档中确认其位置
# from langchain.agents.middleware import ModelFallbackMiddleware

# 使用 ModelRetryMiddleware 处理临时故障
retry_middleware = ModelRetryMiddleware(
max_retries=3, retry_on=(Exception,)
)

# 使用 ModelFallbackMiddleware 进行可靠降级
fallback_middleware = ModelFallbackMiddleware(
fallback_models=[ChatAnthropic(model="claude-3-haiku-20240307"), ChatOllama(model="llama3")]
)

agent = create_agent(
model="openai:gpt-4o",
middleware=[retry_middleware, fallback_middleware],
)

方案三:通过回调系统精细化监控与容错

回调和回退属于不同层面的容错机制——回退解决“谁能做”,而回调解决“做得怎么样”。你可以将两者组合使用:回调负责监控、记录和报警,而回退负责实际的故障恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any, Dict, List
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class StreamingErrorMonitorHandler(BaseCallbackHandler):
"""监控流式输出中的错误并上报"""

def __init__(self):
self.token_count = 0

def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self.token_count += 1

def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
# 监控错误:即使流已经开始,这里也会捕获错误
logger.error(f"流式传输出错 (已输出 {self.token_count} tokens): {error}")
# 可在此处发送告警、写入指标等

结合前面提到的 Runnable|with_fallbacks,以下是一个将 ModelRetryMiddleware 应用于 LCEL 的可视化流程。

flowchart LR
    Start([开始流式调用]) --> CallModel[调用模型 API]
    CallModel --> CheckCall{API 调用
成功?} CheckCall -->|是| StreamStart[开始向用户输出 tokens] StreamStart --> DuringStream[流进行中] DuringStream -->|发生错误| LogError["🔔 回调系统记录错误
但不触发重试/回退"] DuringStream -->|成功| End([结束]) CheckCall -->|否| RetryCheck{剩余重试次数 > 0?} RetryCheck -->|是| WaitBackoff[等待退避延迟] WaitBackoff --> CallModel RetryCheck -->|否| FallbackCheck{有 fallback 模型?} FallbackCheck -->|是| SwitchModel[切换至备选模型] SwitchModel --> StreamStart FallbackCheck -->|否| ThrowFinal[抛出最终异常] ThrowFinal --> End

with_retry重试机制

针对网络抖动等瞬时错误,LangChain提供了多种重试方案。

模型层重试

1
llm = ChatOpenAI(max_retries=3)

Runnable通用重试

1
2
3
# 为流式调用添加重试
for chunk in llm.with_retry().stream("请写一段诗"):
print(chunk, end="")

精细化错误处理策略

按异常类型回退

1
2
3
4
5
# 只在特定异常时触发回退
chain_with_fallback = chain.with_fallbacks(
fallbacks=[fallback_chain],
exceptions_to_handle=(RateLimitError, APIConnectionError)
)

自定义回退逻辑

1
2
3
chain_with_fallback = chain.with_fallbacks([
RunnableLambda(lambda x: "抱歉,服务暂时不可用")
])

异步事件流处理

1
2
3
4
5
6
7
8
try:
async for chunk in chain.astream_events(input):
process(chunk)
except TimeoutError:
handle_timeout()
except Exception as e:
log_error(e)
raise

组合使用:将with_fallbacks()with_retry()结合使用,提供多层容错

粒度控制:回退是按整个输入粒度判断是否失败,不是逐个token

监控日志:使用with_config()添加执行标识,便于链路追踪和调试

性能优化:调整chunk大小、并行处理、缓存机制以提升吞吐量

RetryOutputParser

当模型输出的格式不符合预期时(例如,JSON 缺少一个字段),LangChain 的 RetryOutputParser 可以介入:

  1. 捕获错误OutputParser 尝试解析模型输出的数据块时失败。
  2. 反馈给模型RetryOutputParser 会将解析错误信息(例如“缺少 action_input 字段”)作为新的提示反馈给 LLM。
  3. 模型重试:LLM 接收到错误反馈后,会尝试重新生成一个正确的、符合格式的输出。
  4. 继续流式:新生成的正确输出可以继续通过流式传输给客户端。

方案选型总结

可以根据应用场景的侧重,选择或组合使用不同方案:

方案 适用场景 在流式输出中的作用范围 配置复杂度
ModelRetryMiddleware 处理临时性、可恢复的错误
(如网络抖动、限流)。
作用于调用请求阶段,
在流创建前重试。
中,
需配置退避参数
ModelFallbackMiddleware 处理主模型彻底不可用的情况,
实现跨供应商容灾。
主要作用于连接建立阶段。
Runnable.with_fallbacks 为单个 LCEL 链条添加多级回退方案。 同样作用于连接建立阶段。
回调系统 监控、日志、审计、报警,
以及在流中进行故障处理。
作用于流式全过程
可记录或响应非致命错误。
作者

光星

发布于

2026-04-25

更新于

2026-04-26

许可协议

评论