如何通过API把Deepseek的流式响应输出到前端

前端 潘老师 1个月前 (03-17) 1748 ℃ (0) 扫码查看

最近不少小伙伴都在研究怎么通过API把Deepseek的响应内容以流式的方式输出到前端,今天咱就来详细说说这个事儿。主要以Python后端搭配前端JavaScript为例,给大家讲讲具体的实现方案。

一、两种主流技术方案介绍

(一)Server-Sent Events(SSE)方案

这是一种浏览器原生支持的流式传输方案,用起来比较方便,优先推荐给大家。下面这段是Flask框架下的示例代码:

# Flask 示例
from flask import Response, stream_with_context

@app.route('/stream')
def stream_data():
    # 定义一个生成器函数generate,用于生成流式数据
    def generate():
        # 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=messages,
            stream=True
        )
        
        # 遍历响应的每一个数据块
        for chunk in response:
            # 检查数据块的choices字段是否存在
            if chunk.choices:
                # 获取当前数据块的内容,若不存在则设为空字符串
                content = chunk.choices[0].delta.content or ""
                # 按照SSE格式要求,添加data:前缀和双换行符,生成符合规范的流式数据
                yield f"data: {json.dumps({'content': content})}\n\n"
    
    # 使用Flask的Response函数,将生成器函数的内容作为响应体,设置MIME类型为text/event-stream
    return Response(stream_with_context(generate()), mimetype='text/event-stream')

前端JavaScript代码这样写:

// 创建一个EventSource对象,连接到后端的/stream接口
const eventSource = new EventSource('/stream');

// 当接收到后端推送的消息时,执行该回调函数
eventSource.onmessage = (event) => {
    // 解析接收到的JSON格式数据
    const data = JSON.parse(event.data);
    // 将解析后的数据中的content内容追加到id为output的HTML元素中
    document.getElementById('output').innerHTML += data.content;
};

// 当EventSource连接出错时,执行该回调函数
eventSource.onerror = (err) => {
    // 在控制台打印错误信息
    console.error('EventSource failed:', err);
    // 关闭EventSource连接
    eventSource.close();
};

简单解释一下,SSE就像是后端给前端开了个“小窗口”,后端有新数据就通过这个“窗口”源源不断地推送给前端,前端收到数据后就能马上更新页面显示。

(二)流式HTTP响应(NDJSON)方案

这个方案通用性比较强,不仅适用于浏览器客户端,其他类型的客户端也能用。下面是FastAPI框架下的示例代码:

# FastAPI 示例
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import json

@app.get("/stream")
async def stream_data():
    # 定义一个异步生成器函数generate,用于生成流式数据
    async def generate():
        # 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=messages,
            stream=True
        )
        
        # 异步遍历响应的每一个数据块
        async for chunk in response:
            # 检查数据块的choices字段是否存在
            if chunk.choices:
                # 获取当前数据块的内容,若不存在则设为空字符串
                content = chunk.choices[0].delta.content or ""
                # 按照NDJSON格式要求,将内容转换为JSON字符串并添加换行符,生成符合规范的流式数据
                yield json.dumps({"content": content}) + "\n" 
    
    # 使用FastAPI的StreamingResponse函数,将生成器函数的内容作为响应体,设置媒体类型为application/x-ndjson
    return StreamingResponse(generate(), media_type='application/x-ndjson')

前端JavaScript可以用Fetch API来接收数据:

async function streamData() {
    // 使用Fetch API发送GET请求到/stream接口
    const response = await fetch('/stream');
    // 获取响应体的读取器
    const reader = response.body.getReader();
    // 创建一个文本解码器,用于将二进制数据转换为文本
    const decoder = new TextDecoder();
    
    // 循环读取数据
    while(true) {
        // 读取数据块
        const { done, value } = await reader.read();
        // 如果读取完成,跳出循环
        if(done) break;
        
        // 将读取到的二进制数据解码为文本
        const chunk = decoder.decode(value);
        // 解析JSON格式的文本数据
        const data = JSON.parse(chunk);
        // 将解析后的数据中的content内容追加到id为output的HTML元素中
        document.getElementById('output').innerHTML += data.content;
    }
}

NDJSON方案就好比是把数据打成一个个“包裹”,每个“包裹”都是JSON格式的,然后通过HTTP协议一个一个地发给前端。

二、关键配置说明

(一)响应头设置

Flask框架下需要手动设置响应头:

# Flask
headers = {
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
}

FastAPI框架会自动处理这些配置,不用咱们操心。响应头里的Cache-Control: no-cache是告诉浏览器不要缓存数据,每次都从服务器获取最新的;Connection: keep-alive则是让服务器和客户端之间的连接保持活跃,这样可以持续传输数据。

(二)数据格式选择

  1. SSE(text/event-stream):浏览器原生支持,而且还有自动重连的功能。要是网络出了点小问题,它能自己尝试重新连接,保证数据接收不中断。
  2. NDJSON(application/x-ndjson):这是一种更通用的流式JSON格式,不管是啥类型的客户端都能接收,兼容性很强。
  3. 纯文本流:这种格式简单倒是简单,就是结构化能力有点弱,不太适合复杂的数据传输。

(三)前端处理建议

在前端接收数据的时候,可能会遇到分块不完整的情况。为了更健壮地处理这种问题,可以参考下面这段代码:

// 定义一个缓冲区,用于存储不完整的数据块
let buffer = '';

async function processChunk(chunk) {
    // 将新接收到的数据块追加到缓冲区
    buffer += chunk;
    // 循环处理缓冲区中的数据,直到没有完整的行
    while(buffer.includes('\n')) {
        // 找到缓冲区中第一个换行符的位置
        const lineEnd = buffer.indexOf('\n');
        // 提取缓冲区中第一个完整的行
        const line = buffer.slice(0, lineEnd);
        // 更新缓冲区,去掉已经处理的行
        buffer = buffer.slice(lineEnd + 1);
        
        try {
            // 尝试解析JSON格式的行数据
            const data = JSON.parse(line);
            // 这里可以对解析后的数据进行具体处理
        } catch(e) {
            // 如果解析出错,在控制台打印错误信息
            console.error('解析错误:', e);
        }
    }
}

这段代码的作用就是把接收到的数据先存到一个“小仓库”(buffer)里,然后每次从“小仓库”里找完整的“包裹”(以换行符分隔的数据行),找到就处理,没找到就接着等新的数据。

三、完整工作流程示例(FastAPI + React)

(一)后端代码

# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# 添加CORS中间件,允许所有来源的请求访问API
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/chat")
async def chat_stream(prompt: str):
    # 定义一个异步生成器函数generate,用于生成流式数据
    async def generate():
        # 调用Deepseek的API获取聊天完成的响应,设置stream=True开启流式传输
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        
        # 异步遍历响应的每一个数据块
        async for chunk in response:
            # 检查数据块的content字段是否存在
            if content := chunk.choices[0].delta.content:
                # 将内容转换为JSON字符串,生成符合规范的流式数据
                yield json.dumps({"content": content})
    
    # 使用FastAPI的StreamingResponse函数,将生成器函数的内容作为响应体,设置媒体类型为application/x-ndjson
    return StreamingResponse(generate(), media_type="application/x-ndjson")

这段代码里,FastAPI创建了一个简单的API服务,CORSMiddleware是用来解决跨域问题的,让前端能顺利访问后端接口。chat_stream函数接收前端传来的prompt参数,调用Deepseek的API获取流式响应,再把响应数据返回给前端。

(二)前端React组件代码

// ChatComponent.jsx
import { useState } from 'react';

export default function ChatComponent() {
    // 定义一个状态变量output,用于存储聊天响应内容,初始值为空字符串
    const [output, setOutput] = useState('');
    
    const startStream = async () => {
        // 使用Fetch API发送GET请求到http://api/chat接口,并带上prompt参数
        const response = await fetch('http://api/chat?prompt=你好');
        // 获取响应体的读取器
        const reader = response.body.getReader();
        // 创建一个文本解码器,用于将二进制数据转换为文本
        const decoder = new TextDecoder();
        // 定义一个缓冲区,用于存储不完整的数据块
        let buffer = '';
        
        // 循环读取数据
        while(true) {
            // 读取数据块
            const { done, value } = await reader.read();
            // 如果读取完成,跳出循环
            if(done) break;
            
            // 将读取到的二进制数据解码为文本,并追加到缓冲区
            buffer += decoder.decode(value);
            // 循环处理缓冲区中的数据,直到没有完整的JSON对象
            while(buffer.includes('}')) {
                // 找到缓冲区中第一个JSON对象结束的位置
                const endIndex = buffer.indexOf('}') + 1;
                // 提取缓冲区中第一个完整的JSON对象
                const chunk = buffer.slice(0, endIndex);
                // 更新缓冲区,去掉已经处理的JSON对象
                buffer = buffer.slice(endIndex);
                
                try {
                    // 尝试解析JSON格式的文本数据
                    const data = JSON.parse(chunk);
                    // 更新output状态,将解析后的数据中的content内容追加到原有内容后面
                    setOutput(prev => prev + data.content);
                } catch(e) {
                    // 如果解析出错,在控制台打印错误信息
                    console.error('解析错误:', e);
                }
            }
        }
    };
    
    return (
        <div>
            {/* 定义一个按钮,点击时调用startStream函数 */}
            <button onClick={startStream}>开始对话</button>
            {/* 显示聊天响应内容的区域 */}
            <div id="output">{output}</div>
        </div>
    );
}

在这个React组件里,点击“开始对话”按钮就会触发startStream函数,这个函数从后端获取流式数据,然后在前端解析并显示出来。

四、注意事项

(一)连接管理

  1. 设置合理的超时时间:一般设置在30 – 60秒比较合适。要是连接长时间没动静,就自动断开,避免资源浪费。
  2. 处理客户端提前断开连接的情况:在FastAPI里可以这样处理:
# FastAPI 示例
try:
    async for chunk in response:
        # 处理数据的代码
        if await request.is_disconnected():
            break
finally:
    await client.close()  # 清理资源

这段代码在处理数据的过程中,会检查客户端是不是断开连接了,如果断开就停止处理,最后还会清理相关资源。

(二)性能优化

  1. 使用异步框架:像FastAPI的性能就比Flask要好一些。异步框架能让程序在等待某些操作完成的时候,去做其他事情,提高效率。
  2. 启用响应压缩:可以在FastAPI里通过中间件来实现:
app = FastAPI()
@app.middleware("http")
async def add_compression(request, call_next):
    response = await call_next(request)
    response.headers["Content-Encoding"] = "gzip"
    return response

这段代码给响应头添加了Content-Encoding: gzip,这样数据在传输前会被压缩,能减少传输的数据量,提高传输速度。

(三)安全考虑

  1. 限制最大并发连接数:防止太多客户端同时连接,把服务器挤爆了。
  2. 实施速率限制:比如限制每个客户端每分钟只能请求一定次数,防止恶意请求。在FastAPI里可以借助slowapi库来实现:
from fastapi import Request
from fastapi.middleware import Middleware
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.get("/chat")
@limiter.limit("10/minute")
async def chat_stream(request: Request):
    # 处理请求的代码

这段代码通过limiter.limit("10/minute")限制了每个客户端每分钟最多只能请求10次。

(四)错误处理增强

在生成流式数据的时候,最好加上错误处理,比如这样:

async def generate():
    try:
        response = client.chat.completions.create(...)
        async for chunk in response:
            # 处理数据的代码
    except Exception as e:
        yield json.dumps({"error": str(e)})
    finally:
        await client.close()  # 确保释放资源

这样就算在调用API或者处理数据的过程中出了问题,也能给前端返回错误信息,还能保证资源被正确释放。

总的来说,这些方案可以根据实际需求灵活组合使用。如果只是面向浏览器端的应用,优先选SSE方案;要是需要支持更复杂的场景,WebSocket也是个不错的选择,不过实现起来稍微麻烦点。希望这篇文章能帮到正在研究这个问题的小伙伴们!


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/front/15899.html
喜欢 (1)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】