章
目
录
最近不少小伙伴都在研究怎么通过API把Deepseek的响应内容以流式的方式输出到前端,今天咱就来详细说说这个事儿。主要以Python后端搭配前端JavaScript为例,给大家讲讲具体的实现方案。
一、两种主流技术方案介绍
(一)Server-Sent Events(SSE)方案
这是一种浏览器原生支持的流式传输方案,用起来比较方便,优先推荐给大家。下面这段是Flask框架下的示例代码:
# Flask 示例
from flask import Response, stream_with_context
@app.route('/stream')
def stream_data():
# 定义一个生成器函数generate,用于生成流式数据
def generate():
# 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
# 遍历响应的每一个数据块
for chunk in response:
# 检查数据块的choices字段是否存在
if chunk.choices:
# 获取当前数据块的内容,若不存在则设为空字符串
content = chunk.choices[0].delta.content or ""
# 按照SSE格式要求,添加data:前缀和双换行符,生成符合规范的流式数据
yield f"data: {json.dumps({'content': content})}\n\n"
# 使用Flask的Response函数,将生成器函数的内容作为响应体,设置MIME类型为text/event-stream
return Response(stream_with_context(generate()), mimetype='text/event-stream')
前端JavaScript代码这样写:
// 创建一个EventSource对象,连接到后端的/stream接口
const eventSource = new EventSource('/stream');
// 当接收到后端推送的消息时,执行该回调函数
eventSource.onmessage = (event) => {
// 解析接收到的JSON格式数据
const data = JSON.parse(event.data);
// 将解析后的数据中的content内容追加到id为output的HTML元素中
document.getElementById('output').innerHTML += data.content;
};
// 当EventSource连接出错时,执行该回调函数
eventSource.onerror = (err) => {
// 在控制台打印错误信息
console.error('EventSource failed:', err);
// 关闭EventSource连接
eventSource.close();
};
简单解释一下,SSE就像是后端给前端开了个“小窗口”,后端有新数据就通过这个“窗口”源源不断地推送给前端,前端收到数据后就能马上更新页面显示。
(二)流式HTTP响应(NDJSON)方案
这个方案通用性比较强,不仅适用于浏览器客户端,其他类型的客户端也能用。下面是FastAPI框架下的示例代码:
# FastAPI 示例
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import json
@app.get("/stream")
async def stream_data():
# 定义一个异步生成器函数generate,用于生成流式数据
async def generate():
# 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
# 异步遍历响应的每一个数据块
async for chunk in response:
# 检查数据块的choices字段是否存在
if chunk.choices:
# 获取当前数据块的内容,若不存在则设为空字符串
content = chunk.choices[0].delta.content or ""
# 按照NDJSON格式要求,将内容转换为JSON字符串并添加换行符,生成符合规范的流式数据
yield json.dumps({"content": content}) + "\n"
# 使用FastAPI的StreamingResponse函数,将生成器函数的内容作为响应体,设置媒体类型为application/x-ndjson
return StreamingResponse(generate(), media_type='application/x-ndjson')
前端JavaScript可以用Fetch API来接收数据:
async function streamData() {
// 使用Fetch API发送GET请求到/stream接口
const response = await fetch('/stream');
// 获取响应体的读取器
const reader = response.body.getReader();
// 创建一个文本解码器,用于将二进制数据转换为文本
const decoder = new TextDecoder();
// 循环读取数据
while(true) {
// 读取数据块
const { done, value } = await reader.read();
// 如果读取完成,跳出循环
if(done) break;
// 将读取到的二进制数据解码为文本
const chunk = decoder.decode(value);
// 解析JSON格式的文本数据
const data = JSON.parse(chunk);
// 将解析后的数据中的content内容追加到id为output的HTML元素中
document.getElementById('output').innerHTML += data.content;
}
}
NDJSON方案就好比是把数据打成一个个“包裹”,每个“包裹”都是JSON格式的,然后通过HTTP协议一个一个地发给前端。
二、关键配置说明
(一)响应头设置
Flask框架下需要手动设置响应头:
# Flask
headers = {
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
FastAPI框架会自动处理这些配置,不用咱们操心。响应头里的Cache-Control: no-cache
是告诉浏览器不要缓存数据,每次都从服务器获取最新的;Connection: keep-alive
则是让服务器和客户端之间的连接保持活跃,这样可以持续传输数据。
(二)数据格式选择
- SSE(text/event-stream):浏览器原生支持,而且还有自动重连的功能。要是网络出了点小问题,它能自己尝试重新连接,保证数据接收不中断。
- NDJSON(application/x-ndjson):这是一种更通用的流式JSON格式,不管是啥类型的客户端都能接收,兼容性很强。
- 纯文本流:这种格式简单倒是简单,就是结构化能力有点弱,不太适合复杂的数据传输。
(三)前端处理建议
在前端接收数据的时候,可能会遇到分块不完整的情况。为了更健壮地处理这种问题,可以参考下面这段代码:
// 定义一个缓冲区,用于存储不完整的数据块
let buffer = '';
async function processChunk(chunk) {
// 将新接收到的数据块追加到缓冲区
buffer += chunk;
// 循环处理缓冲区中的数据,直到没有完整的行
while(buffer.includes('\n')) {
// 找到缓冲区中第一个换行符的位置
const lineEnd = buffer.indexOf('\n');
// 提取缓冲区中第一个完整的行
const line = buffer.slice(0, lineEnd);
// 更新缓冲区,去掉已经处理的行
buffer = buffer.slice(lineEnd + 1);
try {
// 尝试解析JSON格式的行数据
const data = JSON.parse(line);
// 这里可以对解析后的数据进行具体处理
} catch(e) {
// 如果解析出错,在控制台打印错误信息
console.error('解析错误:', e);
}
}
}
这段代码的作用就是把接收到的数据先存到一个“小仓库”(buffer)里,然后每次从“小仓库”里找完整的“包裹”(以换行符分隔的数据行),找到就处理,没找到就接着等新的数据。
三、完整工作流程示例(FastAPI + React)
(一)后端代码
# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# 添加CORS中间件,允许所有来源的请求访问API
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/chat")
async def chat_stream(prompt: str):
# 定义一个异步生成器函数generate,用于生成流式数据
async def generate():
# 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True
)
# 异步遍历响应的每一个数据块
async for chunk in response:
# 检查数据块的content字段是否存在
if content := chunk.choices[0].delta.content:
# 将内容转换为JSON字符串,生成符合规范的流式数据
yield json.dumps({"content": content})
# 使用FastAPI的StreamingResponse函数,将生成器函数的内容作为响应体,设置媒体类型为application/x-ndjson
return StreamingResponse(generate(), media_type="application/x-ndjson")
这段代码里,FastAPI创建了一个简单的API服务,CORSMiddleware
是用来解决跨域问题的,让前端能顺利访问后端接口。chat_stream
函数接收前端传来的prompt
参数,调用Deepseek的API获取流式响应,再把响应数据返回给前端。
(二)前端React组件代码
// ChatComponent.jsx
import { useState } from 'react';
export default function ChatComponent() {
// 定义一个状态变量output,用于存储聊天响应内容,初始值为空字符串
const [output, setOutput] = useState('');
const startStream = async () => {
// 使用Fetch API发送GET请求到http://api/chat接口,并带上prompt参数
const response = await fetch('http://api/chat?prompt=你好');
// 获取响应体的读取器
const reader = response.body.getReader();
// 创建一个文本解码器,用于将二进制数据转换为文本
const decoder = new TextDecoder();
// 定义一个缓冲区,用于存储不完整的数据块
let buffer = '';
// 循环读取数据
while(true) {
// 读取数据块
const { done, value } = await reader.read();
// 如果读取完成,跳出循环
if(done) break;
// 将读取到的二进制数据解码为文本,并追加到缓冲区
buffer += decoder.decode(value);
// 循环处理缓冲区中的数据,直到没有完整的JSON对象
while(buffer.includes('}')) {
// 找到缓冲区中第一个JSON对象结束的位置
const endIndex = buffer.indexOf('}') + 1;
// 提取缓冲区中第一个完整的JSON对象
const chunk = buffer.slice(0, endIndex);
// 更新缓冲区,去掉已经处理的JSON对象
buffer = buffer.slice(endIndex);
try {
// 尝试解析JSON格式的文本数据
const data = JSON.parse(chunk);
// 更新output状态,将解析后的数据中的content内容追加到原有内容后面
setOutput(prev => prev + data.content);
} catch(e) {
// 如果解析出错,在控制台打印错误信息
console.error('解析错误:', e);
}
}
}
};
return (
<div>
{/* 定义一个按钮,点击时调用startStream函数 */}
<button onClick={startStream}>开始对话</button>
{/* 显示聊天响应内容的区域 */}
<div id="output">{output}</div>
</div>
);
}
在这个React组件里,点击“开始对话”按钮就会触发startStream
函数,这个函数从后端获取流式数据,然后在前端解析并显示出来。
四、注意事项
(一)连接管理
- 设置合理的超时时间:一般设置在30 – 60秒比较合适。要是连接长时间没动静,就自动断开,避免资源浪费。
- 处理客户端提前断开连接的情况:在FastAPI里可以这样处理:
# FastAPI 示例
try:
async for chunk in response:
# 处理数据的代码
if await request.is_disconnected():
break
finally:
await client.close() # 清理资源
这段代码在处理数据的过程中,会检查客户端是不是断开连接了,如果断开就停止处理,最后还会清理相关资源。
(二)性能优化
- 使用异步框架:像FastAPI的性能就比Flask要好一些。异步框架能让程序在等待某些操作完成的时候,去做其他事情,提高效率。
- 启用响应压缩:可以在FastAPI里通过中间件来实现:
app = FastAPI()
@app.middleware("http")
async def add_compression(request, call_next):
response = await call_next(request)
response.headers["Content-Encoding"] = "gzip"
return response
这段代码给响应头添加了Content-Encoding: gzip
,这样数据在传输前会被压缩,能减少传输的数据量,提高传输速度。
(三)安全考虑
- 限制最大并发连接数:防止太多客户端同时连接,把服务器挤爆了。
- 实施速率限制:比如限制每个客户端每分钟只能请求一定次数,防止恶意请求。在FastAPI里可以借助
slowapi
库来实现:
from fastapi import Request
from fastapi.middleware import Middleware
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.get("/chat")
@limiter.limit("10/minute")
async def chat_stream(request: Request):
# 处理请求的代码
这段代码通过limiter.limit("10/minute")
限制了每个客户端每分钟最多只能请求10次。
(四)错误处理增强
在生成流式数据的时候,最好加上错误处理,比如这样:
async def generate():
try:
response = client.chat.completions.create(...)
async for chunk in response:
# 处理数据的代码
except Exception as e:
yield json.dumps({"error": str(e)})
finally:
await client.close() # 确保释放资源
这样就算在调用API或者处理数据的过程中出了问题,也能给前端返回错误信息,还能保证资源被正确释放。
总的来说,这些方案可以根据实际需求灵活组合使用。如果只是面向浏览器端的应用,优先选SSE方案;要是需要支持更复杂的场景,WebSocket也是个不错的选择,不过实现起来稍微麻烦点。希望这篇文章能帮到正在研究这个问题的小伙伴们!