技术背景介绍

在与大语言模型进行交互时,流式响应是一个非常有用的功能。它允许你在生成完整响应之前逐步获取数据,这在某些实时应用场景中非常重要。所有的LLM都实现了Runnable接口,这个接口提供了标准的可运行方法的默认实现,例如invokebatchastream等。在这些方法中,streamastream(异步)是用于流式处理的重点。

核心原理解析

默认的流式实现提供了一个Iterator(或用于异步流式的AsyncIterator),它最终返回底层聊天模型提供商的最终输出。需要注意的是,逐个令牌的流式输出能力取决于提供商是否实现了正确的流式支持。

代码实现演示

同步流式处理

以下代码演示了如何使用同步流式处理。为了演示效果,我们使用了|作为令牌之间的分隔符:

from langchain_openai import OpenAI

# 创建一个OpenAI实例
llm = OpenAI(model="gpt-3.5-turbo-instruct", temperature=0, max_tokens=512)

# 流式获取响应
for chunk in llm.stream("Write me a 1 verse song about sparkling water."):
    print(chunk, end="|", flush=True)  # 使用'|'作为分隔符

输出结果展示了模型逐步生成响应的过程。

异步流式处理

使用异步流式处理的方式如下:

from langchain_openai import OpenAI
import asyncio

# 创建一个OpenAI实例
llm = OpenAI(model="gpt-3.5-turbo-instruct", temperature=0, max_tokens=512)

# 异步流式获取响应
async def async_stream():
    async for chunk in llm.astream("Write me a 1 verse song about sparkling water."):
        print(chunk, end="|", flush=True)  # 使用'|'作为分隔符

# 运行异步函数
asyncio.run(async_stream())

异步事件流式处理

在复杂的LLM应用程序中(例如涉及多步骤的代理应用),astream_events非常有用:

from langchain_openai import OpenAI
import asyncio

# 创建一个OpenAI实例
llm = OpenAI(model="gpt-3.5-turbo-instruct", temperature=0, max_tokens=512)

# 异步事件流式获取响应
async def async_event_stream():
    idx = 0
    async for event in llm.astream_events(
        "Write me a 1 verse song about goldfish on the moon", version="v1"
    ):
        idx += 1
        if idx >= 5:  # Truncate the output
            print("...Truncated")
            break
        print(event)

# 运行异步函数
asyncio.run(async_event_stream())

应用场景分析

流式响应特别适用于需要即时反馈的应用场景,如实时聊天、数据流监控、逐步数据处理等。它不仅能提高响应速度,还能降低资源的峰值消耗。

实践建议

  1. 在选择流式处理方法时,请优先考虑同步还是异步方式,依据使用场景和系统架构做出选择。
  2. 确保LLM提供商支持流式处理,特别是逐个令牌的生成能力。
  3. 在实现复杂应用时,通过astream_events方法可以获得更低延迟、更高的灵活性。

如果遇到问题欢迎在评论区交流。

—END—

Logo

一站式 AI 云服务平台

更多推荐