cd ..
2026-04-3018 min190 views

WebSocket 在大模型应用中的真实落地:从流式输出到高性能对话系统

#Real-Time Communication#WebSocket#Large Language Models#Stream Output#Performance Optimization
AI Summary
每分钟最多 5 次
  • WebSocket vs. SSE: WebSocket 适合双向实时通信,如大模型聊天、Agent 执行状态推送等;SSE 更适合单向流式输出。
  • WebSocket 的优势: 在大模型应用中,WebSocket 可以实现实时的流式输出和状态推送,提升用户体验,让用户能够看到模型生成过程中的中间状态。
  • JSON 消息格式: 为了便于扩展和维护,建议使用 JSON 格式的消息协议来传递不同类型的消息(如用户输入、模型开始/结束信号、错误信息等)。
  • 高性能实现: 使用 asyncio.Queue 来解耦模型推理与消息发送,避免阻塞事件循环,同时批量合并小片段再发送可以减少网络开销并提高性能。
  • 前端优化: 前端应采用缓冲区加 requestAnimationFrame 来优化渲染,以确保在处理长文本或复杂内容时保持流畅性。

WebSocket 在大模型应用中的真实落地:从流式输出到高性能对话系统

现在做大模型产品,用户最直观的体验之一就是:

模型不是等几十秒后一次性返回,而是像 ChatGPT 一样,一个字、一句话地往外“吐”。

这个效果背后,常见实现方式有两种:SSEWebSocket

SSE 更适合单向流式输出,比如服务端不断把模型结果推给前端;WebSocket 更适合双向实时通信,比如大模型聊天、Agent 执行状态推送、前端中途停止生成、多轮对话协作、实时语音/文本交互等。

这篇文章就结合大模型的真实业务场景,系统讲一下 WebSocket 怎么用、协议怎么设计、流式输出怎么保存,以及怎么做得更高性能。


一、为什么大模型应用需要 WebSocket?

传统 HTTP 请求是这样的:

前端发送问题 -> 后端处理 -> 模型生成完成 -> 一次性返回完整答案

这种方式的问题很明显:

用户要等很久
页面像卡住了一样
模型生成中间状态看不到
不能实时停止
Agent 做到哪一步也不知道

而 WebSocket 的模式是:

建立长连接
前端可以随时发消息
后端也可以随时推消息
连接不断开

所以它非常适合大模型应用里的这些场景:

1. 大模型流式输出
2. Agent 执行过程状态推送
3. RAG 检索进度展示
4. 用户中途停止生成
5. 多人协同聊天
6. 实时语音转文字/文字转语音
7. 后端任务完成后主动通知前端

举个真实一点的例子。

用户问:

请帮我分析这个招标文件的评分点

如果是普通 HTTP,用户只能等。

如果是 WebSocket,前端可以实时看到:

正在读取文件...
正在提取评分项...
正在分析技术标要求...
正在生成优化建议...

最后再流式输出完整回答。

这就是 WebSocket 在大模型产品里的价值:它不只是让文字流出来,而是让整个智能任务过程变得可感知。


二、WebSocket 的基本工作流程

WebSocket 一开始并不是直接建立的,它先通过 HTTP 发起一次协议升级。

前端请求:

GET /ws/chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xxx
Sec-WebSocket-Version: 13

服务端返回:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

这一步完成后,连接就从普通 HTTP 变成了 WebSocket 长连接。

之后双方可以互相发送消息:

前端 -> 后端:用户输入的问题
后端 -> 前端:模型开始生成
后端 -> 前端:模型生成的 chunk
后端 -> 前端:模型生成结束

在大模型应用中,一次完整的通信大概是这样:

用户发送问题
    ↓
后端收到问题
    ↓
模型开始推理
    ↓
每生成一小段就通过 WebSocket 推给前端
    ↓
前端实时追加显示
    ↓
后端拼接完整回答
    ↓
生成结束后保存聊天记录

三、大模型 WebSocket 的业务协议设计

很多人刚开始写 WebSocket,会直接传纯文本:

你好,我是AI助手

这样虽然能跑,但后面很难扩展。

因为你不知道这条消息到底是:

模型输出?
开始信号?
结束信号?
错误信息?
心跳?
Agent状态?

所以在真实项目里,建议统一使用 JSON 消息格式。

1. 前端发送用户问题

{
  "type": "user_message",
  "conversation_id": "conv_001",
  "message_id": "user_msg_001",
  "content": "请介绍一下 WebSocket 在大模型中的应用",
  "model": "full_sft_2048",
  "temperature": 0.8,
  "top_p": 0.9,
  "max_new_tokens": 1024
}

这里不仅包含用户问题,还可以带上模型参数。

比如:

temperature 控制回答随机性
top_p 控制采样范围
max_new_tokens 控制最大生成长度
conversation_id 标识当前会话
message_id 标识当前消息

这些字段后面保存记录、排查问题、做多轮对话都很有用。


2. 后端通知开始生成

{
  "type": "start",
  "conversation_id": "conv_001",
  "message_id": "assistant_msg_001"
}

前端收到这个消息后,就可以显示:

AI 正在思考...

3. 后端流式返回 chunk

模型每生成一小段,就发送一次:

{
  "type": "chunk",
  "conversation_id": "conv_001",
  "message_id": "assistant_msg_001",
  "content": "WebSocket 是一种适合实时通信的协议,"
}

下一段:

{
  "type": "chunk",
  "conversation_id": "conv_001",
  "message_id": "assistant_msg_001",
  "content": "在大模型应用中常用于流式输出和状态推送。"
}

前端收到后做追加:

answer += data.content

这样用户看到的效果就是模型在实时生成。


4. 后端通知生成结束

{
  "type": "done",
  "conversation_id": "conv_001",
  "message_id": "assistant_msg_001",
  "finish_reason": "stop"
}

前端收到 done 后,可以关闭“生成中”状态。

后端也可以在这个阶段保存完整回答。


5. 后端返回错误

{
  "type": "error",
  "conversation_id": "conv_001",
  "message": "模型生成失败,请稍后重试"
}

比如显存不足、模型报错、用户输入异常,都可以通过这种格式返回。


四、FastAPI + WebSocket 实现大模型流式输出

下面是一个简化版实现。

重点逻辑是:

1. 接收用户问题
2. 模型流式生成
3. 每生成一段就发给前端
4. 后端同时拼接完整回答
5. 最后保存聊天记录
import json
import time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()


def save_chat(prompt: str, answer: str, file_path: str = "chat_history.jsonl"):
    data = {
        "time": time.strftime("%Y-%m-%d %H:%M:%S"),
        "prompt": prompt,
        "answer": answer
    }

    with open(file_path, "a", encoding="utf-8") as f:
        f.write(json.dumps(data, ensure_ascii=False) + "\n")


def model_stream(prompt: str):
    """
    这里替换成你自己的大模型流式输出函数。
    真实项目里可能是 tokenizer + model.generate 的流式封装。
    """
    chunks = ["WebSocket ", "非常适合", "大模型", "流式输出", "场景。"]

    for chunk in chunks:
        time.sleep(0.1)
        yield chunk


@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            raw = await websocket.receive_text()
            data = json.loads(raw)

            prompt = data["content"]
            conversation_id = data.get("conversation_id")

            full_answer = ""

            await websocket.send_text(json.dumps({
                "type": "start",
                "conversation_id": conversation_id
            }, ensure_ascii=False))

            try:
                for chunk in model_stream(prompt):
                    full_answer += chunk

                    await websocket.send_text(json.dumps({
                        "type": "chunk",
                        "conversation_id": conversation_id,
                        "content": chunk
                    }, ensure_ascii=False))

                await websocket.send_text(json.dumps({
                    "type": "done",
                    "conversation_id": conversation_id,
                    "finish_reason": "stop"
                }, ensure_ascii=False))

            finally:
                if full_answer.strip():
                    save_chat(prompt, full_answer)

    except WebSocketDisconnect:
        print("客户端断开连接")

这段代码已经可以实现基本流式输出。

但它还有一个问题:如果 model_stream() 是同步阻塞的,模型生成时可能会卡住事件循环,影响其他用户连接。

所以真实项目里还要继续优化。


五、高性能写法:推理和 WebSocket 发送解耦

大模型推理通常是比较慢的,尤其是本地模型。

如果你把模型推理和 WebSocket 发送直接写在一个循环里:

for chunk in model_stream(prompt):
    await websocket.send_text(chunk)

在低并发 demo 中没问题。

但多用户访问时,容易出现:

一个用户生成很慢
其他用户也被影响
WebSocket 响应变慢
后端事件循环被阻塞

更好的方式是引入队列:

模型推理任务负责生产 chunk
WebSocket 任务负责消费 chunk 并发送
两者通过 asyncio.Queue 解耦

结构如下:

用户问题
  ↓
创建 queue
  ↓
模型任务:生成 chunk,放入 queue
  ↓
发送任务:从 queue 取 chunk,推给前端
  ↓
生成结束后保存完整回答

示例代码:

import asyncio
import json
import time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()


def model_stream_sync(prompt: str):
    """
    同步阻塞的模型流式输出。
    真实项目里可以换成你自己的本地大模型推理函数。
    """
    for text in ["你好", ",", "我是", "一个", "AI助手", "。"]:
        time.sleep(0.05)
        yield text


def save_chat(prompt: str, answer: str):
    data = {
        "time": time.strftime("%Y-%m-%d %H:%M:%S"),
        "prompt": prompt,
        "answer": answer
    }

    with open("chat_history.jsonl", "a", encoding="utf-8") as f:
        f.write(json.dumps(data, ensure_ascii=False) + "\n")


async def run_model_to_queue(prompt: str, queue: asyncio.Queue):
    """
    把同步模型推理放进线程,避免阻塞主事件循环。
    """
    loop = asyncio.get_running_loop()

    def produce():
        try:
            for chunk in model_stream_sync(prompt):
                asyncio.run_coroutine_threadsafe(
                    queue.put({
                        "type": "chunk",
                        "content": chunk
                    }),
                    loop
                )
        except Exception as e:
            asyncio.run_coroutine_threadsafe(
                queue.put({
                    "type": "error",
                    "message": str(e)
                }),
                loop
            )
        finally:
            asyncio.run_coroutine_threadsafe(
                queue.put({
                    "type": "done"
                }),
                loop
            )

    await asyncio.to_thread(produce)


async def send_chunks(websocket: WebSocket, queue: asyncio.Queue, prompt: str):
    full_answer = ""

    try:
        while True:
            data = await queue.get()

            if data["type"] == "chunk":
                chunk = data["content"]
                full_answer += chunk

                await websocket.send_text(json.dumps({
                    "type": "chunk",
                    "content": chunk
                }, ensure_ascii=False))

            elif data["type"] == "error":
                await websocket.send_text(json.dumps({
                    "type": "error",
                    "message": data["message"]
                }, ensure_ascii=False))
                break

            elif data["type"] == "done":
                await websocket.send_text(json.dumps({
                    "type": "done"
                }, ensure_ascii=False))
                break

    finally:
        if full_answer.strip():
            await asyncio.to_thread(save_chat, prompt, full_answer)


@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            raw = await websocket.receive_text()
            data = json.loads(raw)

            prompt = data["content"]

            await websocket.send_text(json.dumps({
                "type": "start"
            }, ensure_ascii=False))

            queue = asyncio.Queue(maxsize=128)

            model_task = asyncio.create_task(
                run_model_to_queue(prompt, queue)
            )

            sender_task = asyncio.create_task(
                send_chunks(websocket, queue, prompt)
            )

            await asyncio.gather(model_task, sender_task)

    except WebSocketDisconnect:
        print("客户端断开连接")

这个版本比最简单的写法更适合真实项目。

它解决了几个问题:

模型推理不会直接卡死 WebSocket
queue 可以缓冲模型输出
maxsize 可以防止内存无限增长
保存聊天记录不会阻塞主事件循环

六、流式输出内容怎么保存?

这是大模型 WebSocket 应用中非常关键的一点。

流式输出时,模型不是一次性返回完整答案,而是一段一段返回:

chunk1: 你好
chunk2: ,我是
chunk3: AI助手

但数据库里通常要保存完整回答:

你好,我是AI助手

所以后端需要一边发送,一边拼接:

full_answer = ""

for chunk in model_stream(prompt):
    full_answer += chunk
    await websocket.send_text(chunk)

save_chat(prompt, full_answer)

真实项目不要每个 chunk 都写数据库。

错误做法:

for chunk in model_stream(prompt):
    save_to_db(chunk)
    await websocket.send_text(chunk)

这样会导致:

数据库写入次数太多
性能差
并发上来容易卡
数据不完整时还不好处理

推荐做法:

生成过程中内存拼接
生成完成后统一保存
异常或断开时在 finally 中保存已有内容

这样既保证性能,也尽量避免内容丢失。


七、不要每个 token 都发送一次

大模型流式输出通常是 token 级别的。

如果你每生成一个字就发一次:

你
好
,
我
是
A
I
助
手

WebSocket 消息会非常碎。

问题是:

后端发送次数太多
网络包太多
前端渲染太频繁
CPU 开销变大
高并发时明显拖慢

更好的方式是做小批量合并。

比如每 30 到 80 毫秒发送一次,或者累计几个 chunk 再发送:

async def send_batched_chunks(websocket, queue, prompt):
    full_answer = ""
    buffer = []
    last_send_time = time.time()

    try:
        while True:
            data = await queue.get()

            if data["type"] == "chunk":
                chunk = data["content"]
                full_answer += chunk
                buffer.append(chunk)

                now = time.time()

                if len(buffer) >= 8 or now - last_send_time >= 0.04:
                    await websocket.send_text(json.dumps({
                        "type": "chunk",
                        "content": "".join(buffer)
                    }, ensure_ascii=False))

                    buffer.clear()
                    last_send_time = now

            elif data["type"] == "done":
                if buffer:
                    await websocket.send_text(json.dumps({
                        "type": "chunk",
                        "content": "".join(buffer)
                    }, ensure_ascii=False))
                    buffer.clear()

                await websocket.send_text(json.dumps({
                    "type": "done"
                }, ensure_ascii=False))
                break

    finally:
        if full_answer.strip():
            await asyncio.to_thread(save_chat, prompt, full_answer)

这样用户看到的仍然是流式输出,但后端压力会小很多。

推荐初始参数:

每 40ms 发送一次
或者累计 8 个 chunk 发送一次

用户体验和性能之间比较平衡。


八、前端也要优化渲染

后端做了流式输出,前端也不能随便写。

很多人会这样:

answerBox.innerText += data.content

这在短文本时没问题,但长文本会越来越卡。

更好的方式是用缓冲区加 requestAnimationFrame

<div id="answer"></div>

<script>
const ws = new WebSocket("ws://127.0.0.1:8000/ws/chat");

let answer = "";
let renderBuffer = "";
let rendering = false;

ws.onmessage = function(event) {
    const data = JSON.parse(event.data);

    if (data.type === "chunk") {
        renderBuffer += data.content;
        scheduleRender();
    }

    if (data.type === "done") {
        console.log("生成完成");
    }
};

function scheduleRender() {
    if (rendering) return;

    rendering = true;

    requestAnimationFrame(() => {
        answer += renderBuffer;
        renderBuffer = "";
        document.getElementById("answer").innerText = answer;
        rendering = false;
    });
}

function sendMessage() {
    answer = "";
    renderBuffer = "";

    ws.send(JSON.stringify({
        type: "user_message",
        conversation_id: "conv_001",
        message_id: "user_msg_001",
        content: "请介绍一下 WebSocket 在大模型中的应用",
        temperature: 0.8,
        top_p: 0.9
    }));
}
</script>

<button onclick="sendMessage()">发送</button>

这样前端不是每收到一个小片段就立即操作 DOM,而是按浏览器刷新节奏更新页面。

对于长回答、Markdown 渲染、代码块输出,这个优化非常有用。


九、Agent 场景下的 WebSocket 更有价值

大模型聊天只是 WebSocket 的基础用法。

真正到了 Agent 应用,WebSocket 的价值更明显。

比如一个投标文件分析 Agent,它可能要经过这些步骤:

1. 上传文件
2. 解析 PDF / Word
3. 提取招标要求
4. 检索评分标准
5. 分析废标风险
6. 生成技术标优化建议

如果没有 WebSocket,用户只能等。

有了 WebSocket,后端可以实时推送状态:

{
  "type": "agent_step",
  "step": "parse_file",
  "message": "正在解析招标文件..."
}
{
  "type": "agent_step",
  "step": "extract_score",
  "message": "已提取评分项,正在分析技术得分点..."
}
{
  "type": "agent_step",
  "step": "generate_report",
  "message": "正在生成优化建议..."
}

最后再推送模型输出:

{
  "type": "chunk",
  "content": "根据招标文件要求,本项目技术标应重点关注..."
}

这样用户不会觉得系统“卡死了”,而是能看到任务正在推进。

这对大模型产品体验非常重要。


十、如何实现用户中途停止生成?

大模型流式输出还有一个常见需求:用户点击“停止生成”。

WebSocket 很适合做这个,因为前端可以随时往后端发控制消息:

{
  "type": "stop",
  "conversation_id": "conv_001",
  "message_id": "assistant_msg_001"
}

后端收到后,把当前生成任务取消。

简化思路:

current_task = None

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    global current_task

    await websocket.accept()

    while True:
        raw = await websocket.receive_text()
        data = json.loads(raw)

        if data["type"] == "user_message":
            current_task = asyncio.create_task(handle_generation(websocket, data))

        elif data["type"] == "stop":
            if current_task:
                current_task.cancel()

            await websocket.send_text(json.dumps({
                "type": "stopped"
            }, ensure_ascii=False))

真实项目里不要用全局变量存任务,而是用:

user_id + conversation_id + message_id

来管理不同用户的任务。

比如:

running_tasks = {
    "user_001:conv_001:assistant_msg_001": task
}

这样才能支持多用户并发。


十一、WebSocket 和 SSE 怎么选?

在大模型应用中,WebSocket 和 SSE 都可以实现流式输出。

简单来说:

只需要服务端向前端推模型结果:SSE 更简单
需要前端和后端双向通信:WebSocket 更合适

比如普通问答:

用户发问题
模型流式回答

SSE 就够了。

但如果你需要:

用户中途停止
Agent 实时状态
多轮任务控制
实时语音
多人聊天
后端主动推送通知

WebSocket 更合适。

对于复杂大模型应用,我更推荐 WebSocket,因为后面扩展空间更大。


十二、部署时的 Nginx 配置

如果你的 WebSocket 服务部署在 Nginx 后面,必须配置协议升级。

示例:

location /ws/ {
    proxy_pass http://127.0.0.1:8000;

    proxy_http_version 1.1;

    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";

    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
}

最关键的是:

proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

没有这两行,WebSocket 很容易连接失败。

另外,大模型生成时间可能比较长,所以要适当调大超时时间:

proxy_read_timeout 3600s;
proxy_send_timeout 3600s;

十三、多 worker 部署要小心模型显存

FastAPI 常见启动方式:

uvicorn app:app --host 0.0.0.0 --port 8000

如果你想提高并发,可能会想到:

uvicorn app:app --workers 4

但大模型服务要小心。

如果模型直接加载在 FastAPI 进程里,那么:

4 个 worker 可能会加载 4 份模型
显存可能直接爆掉

所以本地大模型服务建议分两种情况。

第一种,demo 阶段:

FastAPI 和模型放在一个进程
只开 1 个 worker
简单稳定

第二种,生产阶段:

FastAPI 只做 WebSocket 网关
模型单独作为推理服务
FastAPI 通过 HTTP / gRPC / 队列调用模型

推荐架构:

前端页面
  ↓ WebSocket
FastAPI 网关层
  ↓
模型推理服务
  ↓
GPU

这样 WebSocket 连接和模型推理解耦,后面扩容也更容易。


十四、真实项目里的完整架构

一个比较完整的大模型 WebSocket 架构可以这样设计:

前端
  ↓
WebSocket 连接
  ↓
FastAPI 网关层
  ↓
会话管理
  ↓
任务队列
  ↓
模型推理服务
  ↓
流式结果返回
  ↓
WebSocket 推送给前端
  ↓
保存聊天记录 / 任务日志

其中每一层的职责要分清楚。

前端负责:

展示聊天消息
追加流式内容
显示 Agent 状态
发送停止生成指令
处理错误提示

WebSocket 网关负责:

维护连接
解析消息协议
转发用户请求
推送模型输出
处理断开重连

模型服务负责:

加载模型
执行推理
输出 chunk
管理显存
控制并发

存储层负责:

保存用户问题
保存模型回答
保存参数
保存耗时
保存错误日志

不要把所有逻辑都堆在一个 WebSocket 函数里,否则后期会很难维护。


十五、常见坑总结

做大模型 WebSocket 时,常见坑主要有这些。

1. 模型推理阻塞事件循环

同步推理函数直接跑在 async 函数里,会影响其他连接。

解决方式:

用 asyncio.to_thread
或者把模型服务拆出去

2. 每个 token 都发送一次

这样消息太碎,高并发时性能差。

解决方式:

按 30-80ms 批量发送
或者累计几个 chunk 再发送

3. 不保存完整回答

只顾着推给前端,没有在后端拼接完整内容,最后无法保存聊天记录。

解决方式:

full_answer += chunk

生成结束后统一保存。


4. 用户断开后内容丢失

如果用户刷新页面,可能还没执行到保存逻辑。

解决方式:

try:
    ...
finally:
    save_chat(prompt, full_answer)

5. Nginx 没配置 Upgrade

导致本地能连,部署后连不上。

解决方式:

proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

6. 多 worker 加载多份模型

导致显存爆掉。

解决方式:

模型单独部署
FastAPI 只做网关

十六、结语

WebSocket 在大模型应用中,不只是一个“让文字流出来”的工具。

它真正解决的是大模型产品里的实时交互问题:

模型生成过程可见
Agent 执行状态可见
用户可以中途停止
后端可以主动推送
复杂任务体验更自然

对于简单聊天系统,WebSocket 可以实现流式输出;对于更复杂的 Agent 产品,WebSocket 可以把文件解析、检索、规划、调用工具、生成结果这些过程一步步展示给用户。

一个好的大模型 WebSocket 系统,重点不是会不会 send_text(),而是要设计好这几件事:

业务协议格式
流式输出拼接
异常和断开处理
chunk 批量发送
前端渲染优化
任务队列隔离
模型推理解耦
部署层长连接支持

最终可以总结成一句话:

WebSocket 负责实时连接,大模型负责内容生成,而真正决定产品体验的,是你如何把“生成过程”设计成一套稳定、清晰、可保存、可中断、可扩展的实时交互流程。

/** Comments(0)*/

Loading comments...