Spring Webflux 下调用大模型(以 DeepSeek 为例
本文档总结了使用 Spring Webflux 调用大模型(以 DeepSeek API 为例)的核心代码实现
Controller (AiController.java
)
AiController
负责接收前端或客户端的请求,并将请求转发给 AiService
处理。
- 路径映射: 通过
@GetMapping("/chat")
将 HTTP GET 请求映射到/chat
路径。 - 流式响应:
produces = MediaType.TEXT_EVENT_STREAM_VALUE
指定响应类型为 Server-Sent Events (SSE),适用于流式数据传输。 - 请求参数:
@RequestParam String question
接收名为question
的查询参数。 - 服务调用: 调用
aiService.chat(question)
方法,并将返回的Flux
(响应流)直接作为 HTTP 响应体返回。
package com.example.springai.controller;
// ... 导入 ...
@Slf4j
@Controller
public class AiController {
@Autowired
private AiService aiService;
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux chat(@RequestParam String question) {
return aiService.chat(question);
}
// ... 其他方法 ...
}
DTO (Data Transfer Objects)
DTO 用于定义请求和响应的数据结构。
ChatRequest.java
定义了发送给大模型 API 的请求体结构。
model
: 指定使用的模型名称。messages
: 一个列表,包含对话历史和当前用户输入。每个Message
对象有role
(e.g., "user", "assistant") 和content
(消息文本)。stream
: 布尔值,指示是否启用流式响应。- 其他可选参数如
temperature
,top_k
,max_tokens
等用于控制生成行为。
package com.example.springai.dto;
import java.util.List;
import lombok.Data;
@Data
public class ChatRequest {
private String model;
private List messages;
private Boolean stream;
// ... 其他可选字段 ...
@Data
public static class Message {
private String role;
private String content;
}
// ... 其他嵌套类,例如 Tool, Function 等(如果使用) ...
}
StreamChatResponse.java
定义了从大模型 API 流式接收的每个数据块的结构。
choices
: 一个列表,通常包含一个Choice
对象。delta
: 包含模型生成的增量内容。content
: 实际的文本片段。
usage
: 可能包含 token 使用情况统计(在流结束时或单独的块中)。@JsonIgnoreProperties(ignoreUnknown = true)
: 用于忽略 API 响应中未在 DTO 中定义的字段,增加兼容性。
package com.example.springai.dto;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamChatResponse {
private Integer code; // 特定于某些 API(如 DeepSeek)用于表示错误
private String message; // 错误信息
private List choices;
private Usage usage;
// ... 其他字段,例如 id, created, object ...
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Choice {
private Delta delta;
private Integer index;
// ... finish_reason 等 ...
}
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Delta {
private String role;
private String content;
}
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Usage {
// ... 字段,例如 prompt_tokens, completion_tokens, total_tokens ...
}
}
使用 WebClient
AiService
封装了调用大模型 API 的核心逻辑。
chat
方法: 这是实现与大模型交互的关键方法。- 构建请求体: 创建
ChatRequest
对象,设置模型名称 、启用流式传输 (setStream(true)
),并构建包含用户问题 (question
) 的消息列表。 - 发起 POST 请求: 使用
WebClient
向
(占位符) 发起 POST 请求。/chat/completions - 设置
Content-Type
为application/json
。 - 设置
Authorization
头,值为Bearer
(占位符)。 - 将
ChatRequest
对象作为请求体发送。
- 设置
- 处理响应流:
retrieve().bodyToFlux(String.class)
: 将响应体作为字符串流 (Flux
) 读取。- 过滤:
filter(line -> !line.equals("[DONE]"))
: 过滤掉 SSE 流结束的标志。filter(line -> !line.trim().equals(": keep-alive"))
: 过滤掉可能的 keep-alive 注释(DeepSeek官方提到有时会限流)。
- JSON 解析:
map(json -> objectMapper.readValue(json, StreamChatResponse.class))
: 将每个 JSON 字符串块反序列化为StreamChatResponse
对象。- 包含错误处理逻辑,记录 JSON 解析失败的日志。
- 提取内容:
flatMap(chatResponse -> ...)
: 检查响应中是否有错误码,如果没有错误,则提取choices[0].delta.content
中的实际文本内容。
- 错误处理:
doOnError
用于记录流处理过程中发生的任何错误。
- 构建请求体: 创建
package com.example.springai.service;
// ... 导入 ...
@Slf4j
@Service
public class AiService {
private WebClient webClient;
private ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
@PostConstruct
public void init() {
webClient = WebClient.create();
}
public Flux chat(String question) {
ChatRequest request = new ChatRequest();
request.setModel(aiModel.getModel());
request.setStream(true);
Message message = new Message();
message.setRole("user");
message.setContent(question);
request.setMessages(List.of(message));
return webClient.post()
.uri(aiModel.getBaseUrl() + "/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.header("Authorization", "Bearer " + aiModel.getApiKey())
.bodyValue(request)
.retrieve()
.bodyToFlux(String.class) // 以字符串流方式读取
// 过滤sse结束标签
.filter(line -> !line.equals("[DONE]"))
// 过滤 keep-alive 注释
.filter(line -> !line.trim().equals(": keep-alive"))
.filter(chatResponse -> chatResponse != null)
.map(json -> {
try {
// 反序列化为ChatResponse
return objectMapper.readValue(json, StreamChatResponse.class);
} catch (Exception e) {
log.error("JSON解析失败: {}", json, e);
throw new RuntimeException(e);
}
})
.flatMapSequential(chatResponse -> {
if (chatResponse.getCode() != null && chatResponse.getCode() != 0) {
log.error("error: {}", chatResponse.getMessage());
return Mono.empty();
}
String content = chatResponse.getChoices().get(0).getDelta().getContent();
return Mono.just(content);
})
.doOnError(e -> {
log.error(e.getMessage(), e);
e.printStackTrace();
});
}
}
使用 OpenAI SDK
除了使用 WebClient 直接调用 API,我们还可以使用 OpenAI SDK 来简化开发。以下是使用 SDK 的实现方式。
Maven 依赖
在 pom.xml
中添加以下依赖:
<dependency>
<groupId>com.openaigroupId>
<artifactId>openai-javaartifactId>
<version>1.3.0version>
dependency>
配置类 (AiClientConfig.java
)
配置类负责创建和配置 OpenAIClientAsync Bean:
@Configuration
public class AiClientConfig {
@Bean
OpenAIClientAsync openAIClientAsync() {
return OpenAIOkHttpClientAsync.builder()
.apiKey("<你的API密钥>")
.baseUrl("https://api.deepseek.com/v1")
.build();
}
}
服务类 (AiService.java
)
使用 SDK 实现的服务类更加简洁,无需手动处理 JSON 序列化和请求构建:
@Slf4j
@Service
public class AiService {
@Autowired
private OpenAIClientAsync openAIClientAsync;
public Flux openAiChat(String question) {
ChatCompletionCreateParams createParams = ChatCompletionCreateParams.builder()
.model(aiModel.getModel())
.addUserMessage(question)
.build();
AsyncStreamResponse stream = client.chat().completions().createStreaming(createParams);
return Flux.push(sink -> {
stream.subscribe(completion -> completion.choices().stream()
.flatMap(choice -> choice.delta().content().stream())
.forEach(content -> {
sink.next(content);
}))
.onCompleteFuture()
.thenAccept(nullable -> {
// 关闭Flux流
sink.complete();
});
});
}
}
评论记录:
回复评论: