随着大模型(如 GPT、BERT 等)的流行,越来越多的 API 提供商开始提供流式接口,这使得我们能够更高效地处理大规模数据,尤其是在响应体非常大的情况下。本文将探讨如何使用 OkHttpSpring WebClient 两种方式对接大模型的流式接口,实现高效的异步请求处理。

一、流式接口概述

流式接口是一种允许客户端逐步接收服务器响应数据的机制。相比传统的“一次性返回”方式,流式接口通常用于处理大体积数据或者需要长时间处理的请求。客户端通过建立持久连接,持续从服务器获取数据,而无需等待整个响应体加载完成。

大模型 API 的流式接口通常采用 HTTP 2.0WebSocket 协议进行数据流传输,能够以流的方式发送大量的数据块或结果,这对实现低延迟、高吞吐量的请求处理至关重要。

二、使用 OkHttp 对接流式接口

2.1 配置 OkHttp 客户端

OkHttp 是一个强大的 HTTP 客户端,支持异步请求、连接池和流式响应等特性。下面是如何使用 OkHttp 实现对接流式接口的示例。

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.12.0</version>在这里插入代码片
</dependency>
package com.dolphin.bootstrap.agent.streamchat;

import com.alibaba.fastjson.JSONObject;
import com.dolphin.bootstrap.agent.BaseAgent;
import io.micrometer.common.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okio.BufferedSource;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * 利用 OKHttp 库,实现 Stream 流式调用
 */
@Component
@Slf4j
public class OKHttpStream implements BaseAgent {
    private static final Integer TIME_OUT = 5 * 60;

    @Value("ai.agent.streamChatUrl")
    private String streamChatUrl;

    @Override
    public void chatStream(SseEmitter sseEmitter, String question) {
        // 1. 创建 OkHttpClient 对象,并设置超时时间
        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(TIME_OUT, TimeUnit.SECONDS)
                .readTimeout(TIME_OUT, TimeUnit.SECONDS)
                .writeTimeout(TIME_OUT, TimeUnit.SECONDS)
                .build();

        //  封装请求体参数
        JSONObject params = new JSONObject();
        params.put("question", question);
        RequestBody requestBody = RequestBody.create(params.toJSONString(), MediaType.parse("application/json; charset=utf-8"));

        // 封装请求头
        Headers headers = new Headers.Builder()
                .set("Content-Type","application/json")
                .set("Accept","text/event-stream")
                .build();


        // 2. 构建 Request 对象
        Request request = new Request.Builder()
                .url(streamChatUrl)
                .headers(headers)
                .post(requestBody)
                .build();

        // 3. 创建 Call 对象
        Call call = client.newCall(request);

        // 4. 监听回调
        call.enqueue(new Callback() {
            @Override
            public void onFailure(@NotNull Call call, @NotNull IOException e) {
                log.error("进入 onFailure方法 {}", e.getMessage(), e);
                sseEmitter.completeWithError(e);
            }

            @Override
            public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
                if (response.isSuccessful()) {
                    String chunkMessage = "";
                    try (ResponseBody responseBody = response.body();) {
                        BufferedSource source = responseBody.source();
                        while (!source.exhausted()) {
                            chunkMessage = source.readUtf8Line();
                            if (StringUtils.isBlank(chunkMessage)) {
                                continue;
                            }
                            JSONObject jsonObject = JSONObject.parseObject(chunkMessage);
                            if (null != jsonObject && null != jsonObject.getJSONObject("data")) {
                                String answer = jsonObject.getJSONObject("data").getString("answer");
                                sseEmitter.send(answer);
                            }
                            
                        }
                    } catch (Exception e) {
                        log.error("解析失败: {}", e.getMessage(), e);
                        sseEmitter.completeWithError(e);
                    }
                } else {
                    log.error("onResponse 方法请求失败 {}", response.message());
                    // TODO 重新发起请求
                }
            }
        });


        BaseAgent.super.chatStream(sseEmitter, question);
    }
}

2.2 代码解释

​ • call.enqueue(new Callback() {…});:异步执行请求,回调函数用于处理响应。

这种方式非常适合大模型 API,因为它能够实时获取并处理每一个数据块,而不需要等待整个响应体加载完成。

三、使用 WebClient 对接流式接口

Spring 5 引入的 WebClient 是基于响应式编程的 HTTP 客户端,特别适用于流式数据处理和高并发场景。WebClient 支持通过 MonoFlux 来处理异步数据流,非常适合与大模型的流式接口对接。

3.1 配置 WebClient

首先,需要在 Spring Boot 项目中配置 WebClient。可以通过 WebClient.Builder 来定制客户端配置。

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>6.1.4</version>
</dependency>
package com.dolphin.bootstrap.agent.streamchat;

import com.alibaba.fastjson.JSONObject;
import com.dolphin.bootstrap.agent.BaseAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.util.Collections;

/**
 * 利用 WebClient 类,实现 Stream 流式调用
 */
public class WebClientStream implements BaseAgent {
    private static final Logger log = LoggerFactory.getLogger(WebClientStream.class);
    @Value("ai.agent.streamChatUrl")
    private String streamChatUrl;

    @Override
    public void chatStream(SseEmitter sseEmitter, String question) {
        // 创建 WebClient 客户端
        WebClient webClient = WebClient.builder().baseUrl(streamChatUrl).build();
        // 封装参数
        JSONObject params = new JSONObject();
        params.put("question", question);

        // 封装请求头
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        // accept 一定要设置为 TEXT_EVENT_STREAM
        headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));

        Flux<String> eventStream = webClient
                .post()
                .uri("/stream")
                .accept(MediaType.valueOf("text/event-stream;charset=UTF-8")) // 一定要设置
                .headers(httpHeaders -> httpHeaders.addAll(headers))
                .bodyValue(params.toJSONString())
                .retrieve()
                .bodyToFlux(String.class);

        eventStream.subscribe(
            data -> { // data 有什么key,value。具体看你对接agent的文档
                try {
                    JSONObject bodyJson = JSONObject.parseObject(data);
                    if (bodyJson.getIntValue("code") == HttpStatus.OK.value()) {
                        // 事件类型
                        String event = bodyJson.getString("event");

                        // agent回答
                        String answer = bodyJson.getJSONObject("data").getString("answer");

                        if ("message".equals(event)) {
                            // 中间一段一段的消息
                            sseEmitter.send(answer);
                        } else if ("end".equals(event)) {
                            // 最后会全部返回
                            sseEmitter.send(answer);
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

            },
            error -> {
                sseEmitter.completeWithError(error);
                log.error("报错了:{}", error.getMessage(), error);
            },
            sseEmitter::complete
        );


    }
}

3.2 代码解释

​ • bodyValue :添加请求体。

​ • retrieve :执行请求。

​ • bodyToFlux :将响应体处理为 Flux,Flux 代表一个可以包含多个元素的数据流。。

​ • subscribe :非阻塞,注册回调函数。第一个回调:正常数据返回。第二个回调:出错。第三个回调:请求完成。

3.3 错误处理

WebClient 还提供了完善的错误处理机制,可以通过 .onStatus() 或 .onErrorResume() 方法捕获 HTTP 错误或其他异常情况。

webClient.get()
    .uri("/")
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException("客户端错误")))
    .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException("服务器错误")))
    .bodyToFlux(String.class)
    .doOnNext(data -> System.out.println("接收到数据块:" + data))
    .subscribe();

四、OkHttp 与 WebClient 的对比

特性 OkHttp WebClient
编程模型 阻塞或异步操作 响应式编程,支持异步和流式处理
并发处理 支持异步请求,但不具备响应式特性 支持高并发,且更适合流式数据处理
适用场景 传统的 HTTP 请求,适合简单的客户端 高并发、响应式编程,流式接口等
集成生态 单独的 HTTP 客户端库 与 Spring WebFlux 集成,适用于微服务架构

选择建议:

​ • 如果你已经在使用 Spring Boot,且需要处理流式接口,WebClient 是更推荐的选择。它与 Spring WebFlux 配合默契,能处理高并发和复杂的数据流场景。

​ • 如果你只是需要处理简单的流式 HTTP 请求,OkHttp 是一个非常轻量且功能强大的库,适合用于快速集成。

五、总结

无论是 OkHttp 还是 WebClient,它们都提供了高效的流式数据处理能力,能够帮助我们在与大模型 API 对接时减少等待时间,提高响应速度和吞吐量。选择哪个工具取决于你的项目需求和技术栈,如果你在使用 Spring Boot 或 WebFlux,WebClient 无疑是一个理想的选择。

Logo

一站式 AI 云服务平台

更多推荐