WebSocket 在大模型应用中的真实落地:从流式输出到高性能对话系统
- WebSocket vs. SSE: WebSocket 适合双向实时通信,如大模型聊天、Agent 执行状态推送等;SSE 更适合单向流式输出。
- WebSocket 的优势: 在大模型应用中,WebSocket 可以实现实时的流式输出和状态推送,提升用户体验,让用户能够看到模型生成过程中的中间状态。
- JSON 消息格式: 为了便于扩展和维护,建议使用 JSON 格式的消息协议来传递不同类型的消息(如用户输入、模型开始/结束信号、错误信息等)。
- 高性能实现: 使用
asyncio.Queue来解耦模型推理与消息发送,避免阻塞事件循环,同时批量合并小片段再发送可以减少网络开销并提高性能。 - 前端优化: 前端应采用缓冲区加
requestAnimationFrame来优化渲染,以确保在处理长文本或复杂内容时保持流畅性。
WebSocket 在大模型应用中的真实落地:从流式输出到高性能对话系统
现在做大模型产品,用户最直观的体验之一就是:
模型不是等几十秒后一次性返回,而是像 ChatGPT 一样,一个字、一句话地往外“吐”。
这个效果背后,常见实现方式有两种:SSE 和 WebSocket。
SSE 更适合单向流式输出,比如服务端不断把模型结果推给前端;WebSocket 更适合双向实时通信,比如大模型聊天、Agent 执行状态推送、前端中途停止生成、多轮对话协作、实时语音/文本交互等。
这篇文章就结合大模型的真实业务场景,系统讲一下 WebSocket 怎么用、协议怎么设计、流式输出怎么保存,以及怎么做得更高性能。
一、为什么大模型应用需要 WebSocket?
传统 HTTP 请求是这样的:
前端发送问题 -> 后端处理 -> 模型生成完成 -> 一次性返回完整答案
这种方式的问题很明显:
用户要等很久
页面像卡住了一样
模型生成中间状态看不到
不能实时停止
Agent 做到哪一步也不知道
而 WebSocket 的模式是:
建立长连接
前端可以随时发消息
后端也可以随时推消息
连接不断开
所以它非常适合大模型应用里的这些场景:
1. 大模型流式输出
2. Agent 执行过程状态推送
3. RAG 检索进度展示
4. 用户中途停止生成
5. 多人协同聊天
6. 实时语音转文字/文字转语音
7. 后端任务完成后主动通知前端
举个真实一点的例子。
用户问:
请帮我分析这个招标文件的评分点
如果是普通 HTTP,用户只能等。
如果是 WebSocket,前端可以实时看到:
正在读取文件...
正在提取评分项...
正在分析技术标要求...
正在生成优化建议...
最后再流式输出完整回答。
这就是 WebSocket 在大模型产品里的价值:它不只是让文字流出来,而是让整个智能任务过程变得可感知。
二、WebSocket 的基本工作流程
WebSocket 一开始并不是直接建立的,它先通过 HTTP 发起一次协议升级。
前端请求:
GET /ws/chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xxx
Sec-WebSocket-Version: 13
服务端返回:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
这一步完成后,连接就从普通 HTTP 变成了 WebSocket 长连接。
之后双方可以互相发送消息:
前端 -> 后端:用户输入的问题
后端 -> 前端:模型开始生成
后端 -> 前端:模型生成的 chunk
后端 -> 前端:模型生成结束
在大模型应用中,一次完整的通信大概是这样:
用户发送问题
↓
后端收到问题
↓
模型开始推理
↓
每生成一小段就通过 WebSocket 推给前端
↓
前端实时追加显示
↓
后端拼接完整回答
↓
生成结束后保存聊天记录
三、大模型 WebSocket 的业务协议设计
很多人刚开始写 WebSocket,会直接传纯文本:
你好,我是AI助手
这样虽然能跑,但后面很难扩展。
因为你不知道这条消息到底是:
模型输出?
开始信号?
结束信号?
错误信息?
心跳?
Agent状态?
所以在真实项目里,建议统一使用 JSON 消息格式。
1. 前端发送用户问题
{
"type": "user_message",
"conversation_id": "conv_001",
"message_id": "user_msg_001",
"content": "请介绍一下 WebSocket 在大模型中的应用",
"model": "full_sft_2048",
"temperature": 0.8,
"top_p": 0.9,
"max_new_tokens": 1024
}
这里不仅包含用户问题,还可以带上模型参数。
比如:
temperature 控制回答随机性
top_p 控制采样范围
max_new_tokens 控制最大生成长度
conversation_id 标识当前会话
message_id 标识当前消息
这些字段后面保存记录、排查问题、做多轮对话都很有用。
2. 后端通知开始生成
{
"type": "start",
"conversation_id": "conv_001",
"message_id": "assistant_msg_001"
}
前端收到这个消息后,就可以显示:
AI 正在思考...
3. 后端流式返回 chunk
模型每生成一小段,就发送一次:
{
"type": "chunk",
"conversation_id": "conv_001",
"message_id": "assistant_msg_001",
"content": "WebSocket 是一种适合实时通信的协议,"
}
下一段:
{
"type": "chunk",
"conversation_id": "conv_001",
"message_id": "assistant_msg_001",
"content": "在大模型应用中常用于流式输出和状态推送。"
}
前端收到后做追加:
answer += data.content
这样用户看到的效果就是模型在实时生成。
4. 后端通知生成结束
{
"type": "done",
"conversation_id": "conv_001",
"message_id": "assistant_msg_001",
"finish_reason": "stop"
}
前端收到 done 后,可以关闭“生成中”状态。
后端也可以在这个阶段保存完整回答。
5. 后端返回错误
{
"type": "error",
"conversation_id": "conv_001",
"message": "模型生成失败,请稍后重试"
}
比如显存不足、模型报错、用户输入异常,都可以通过这种格式返回。
四、FastAPI + WebSocket 实现大模型流式输出
下面是一个简化版实现。
重点逻辑是:
1. 接收用户问题
2. 模型流式生成
3. 每生成一段就发给前端
4. 后端同时拼接完整回答
5. 最后保存聊天记录
import json
import time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
def save_chat(prompt: str, answer: str, file_path: str = "chat_history.jsonl"):
data = {
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"prompt": prompt,
"answer": answer
}
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
def model_stream(prompt: str):
"""
这里替换成你自己的大模型流式输出函数。
真实项目里可能是 tokenizer + model.generate 的流式封装。
"""
chunks = ["WebSocket ", "非常适合", "大模型", "流式输出", "场景。"]
for chunk in chunks:
time.sleep(0.1)
yield chunk
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
raw = await websocket.receive_text()
data = json.loads(raw)
prompt = data["content"]
conversation_id = data.get("conversation_id")
full_answer = ""
await websocket.send_text(json.dumps({
"type": "start",
"conversation_id": conversation_id
}, ensure_ascii=False))
try:
for chunk in model_stream(prompt):
full_answer += chunk
await websocket.send_text(json.dumps({
"type": "chunk",
"conversation_id": conversation_id,
"content": chunk
}, ensure_ascii=False))
await websocket.send_text(json.dumps({
"type": "done",
"conversation_id": conversation_id,
"finish_reason": "stop"
}, ensure_ascii=False))
finally:
if full_answer.strip():
save_chat(prompt, full_answer)
except WebSocketDisconnect:
print("客户端断开连接")
这段代码已经可以实现基本流式输出。
但它还有一个问题:如果 model_stream() 是同步阻塞的,模型生成时可能会卡住事件循环,影响其他用户连接。
所以真实项目里还要继续优化。
五、高性能写法:推理和 WebSocket 发送解耦
大模型推理通常是比较慢的,尤其是本地模型。
如果你把模型推理和 WebSocket 发送直接写在一个循环里:
for chunk in model_stream(prompt):
await websocket.send_text(chunk)
在低并发 demo 中没问题。
但多用户访问时,容易出现:
一个用户生成很慢
其他用户也被影响
WebSocket 响应变慢
后端事件循环被阻塞
更好的方式是引入队列:
模型推理任务负责生产 chunk
WebSocket 任务负责消费 chunk 并发送
两者通过 asyncio.Queue 解耦
结构如下:
用户问题
↓
创建 queue
↓
模型任务:生成 chunk,放入 queue
↓
发送任务:从 queue 取 chunk,推给前端
↓
生成结束后保存完整回答
示例代码:
import asyncio
import json
import time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
def model_stream_sync(prompt: str):
"""
同步阻塞的模型流式输出。
真实项目里可以换成你自己的本地大模型推理函数。
"""
for text in ["你好", ",", "我是", "一个", "AI助手", "。"]:
time.sleep(0.05)
yield text
def save_chat(prompt: str, answer: str):
data = {
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"prompt": prompt,
"answer": answer
}
with open("chat_history.jsonl", "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
async def run_model_to_queue(prompt: str, queue: asyncio.Queue):
"""
把同步模型推理放进线程,避免阻塞主事件循环。
"""
loop = asyncio.get_running_loop()
def produce():
try:
for chunk in model_stream_sync(prompt):
asyncio.run_coroutine_threadsafe(
queue.put({
"type": "chunk",
"content": chunk
}),
loop
)
except Exception as e:
asyncio.run_coroutine_threadsafe(
queue.put({
"type": "error",
"message": str(e)
}),
loop
)
finally:
asyncio.run_coroutine_threadsafe(
queue.put({
"type": "done"
}),
loop
)
await asyncio.to_thread(produce)
async def send_chunks(websocket: WebSocket, queue: asyncio.Queue, prompt: str):
full_answer = ""
try:
while True:
data = await queue.get()
if data["type"] == "chunk":
chunk = data["content"]
full_answer += chunk
await websocket.send_text(json.dumps({
"type": "chunk",
"content": chunk
}, ensure_ascii=False))
elif data["type"] == "error":
await websocket.send_text(json.dumps({
"type": "error",
"message": data["message"]
}, ensure_ascii=False))
break
elif data["type"] == "done":
await websocket.send_text(json.dumps({
"type": "done"
}, ensure_ascii=False))
break
finally:
if full_answer.strip():
await asyncio.to_thread(save_chat, prompt, full_answer)
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
raw = await websocket.receive_text()
data = json.loads(raw)
prompt = data["content"]
await websocket.send_text(json.dumps({
"type": "start"
}, ensure_ascii=False))
queue = asyncio.Queue(maxsize=128)
model_task = asyncio.create_task(
run_model_to_queue(prompt, queue)
)
sender_task = asyncio.create_task(
send_chunks(websocket, queue, prompt)
)
await asyncio.gather(model_task, sender_task)
except WebSocketDisconnect:
print("客户端断开连接")
这个版本比最简单的写法更适合真实项目。
它解决了几个问题:
模型推理不会直接卡死 WebSocket
queue 可以缓冲模型输出
maxsize 可以防止内存无限增长
保存聊天记录不会阻塞主事件循环
六、流式输出内容怎么保存?
这是大模型 WebSocket 应用中非常关键的一点。
流式输出时,模型不是一次性返回完整答案,而是一段一段返回:
chunk1: 你好
chunk2: ,我是
chunk3: AI助手
但数据库里通常要保存完整回答:
你好,我是AI助手
所以后端需要一边发送,一边拼接:
full_answer = ""
for chunk in model_stream(prompt):
full_answer += chunk
await websocket.send_text(chunk)
save_chat(prompt, full_answer)
真实项目不要每个 chunk 都写数据库。
错误做法:
for chunk in model_stream(prompt):
save_to_db(chunk)
await websocket.send_text(chunk)
这样会导致:
数据库写入次数太多
性能差
并发上来容易卡
数据不完整时还不好处理
推荐做法:
生成过程中内存拼接
生成完成后统一保存
异常或断开时在 finally 中保存已有内容
这样既保证性能,也尽量避免内容丢失。
七、不要每个 token 都发送一次
大模型流式输出通常是 token 级别的。
如果你每生成一个字就发一次:
你
好
,
我
是
A
I
助
手
WebSocket 消息会非常碎。
问题是:
后端发送次数太多
网络包太多
前端渲染太频繁
CPU 开销变大
高并发时明显拖慢
更好的方式是做小批量合并。
比如每 30 到 80 毫秒发送一次,或者累计几个 chunk 再发送:
async def send_batched_chunks(websocket, queue, prompt):
full_answer = ""
buffer = []
last_send_time = time.time()
try:
while True:
data = await queue.get()
if data["type"] == "chunk":
chunk = data["content"]
full_answer += chunk
buffer.append(chunk)
now = time.time()
if len(buffer) >= 8 or now - last_send_time >= 0.04:
await websocket.send_text(json.dumps({
"type": "chunk",
"content": "".join(buffer)
}, ensure_ascii=False))
buffer.clear()
last_send_time = now
elif data["type"] == "done":
if buffer:
await websocket.send_text(json.dumps({
"type": "chunk",
"content": "".join(buffer)
}, ensure_ascii=False))
buffer.clear()
await websocket.send_text(json.dumps({
"type": "done"
}, ensure_ascii=False))
break
finally:
if full_answer.strip():
await asyncio.to_thread(save_chat, prompt, full_answer)
这样用户看到的仍然是流式输出,但后端压力会小很多。
推荐初始参数:
每 40ms 发送一次
或者累计 8 个 chunk 发送一次
用户体验和性能之间比较平衡。
八、前端也要优化渲染
后端做了流式输出,前端也不能随便写。
很多人会这样:
answerBox.innerText += data.content
这在短文本时没问题,但长文本会越来越卡。
更好的方式是用缓冲区加 requestAnimationFrame:
<div id="answer"></div>
<script>
const ws = new WebSocket("ws://127.0.0.1:8000/ws/chat");
let answer = "";
let renderBuffer = "";
let rendering = false;
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.type === "chunk") {
renderBuffer += data.content;
scheduleRender();
}
if (data.type === "done") {
console.log("生成完成");
}
};
function scheduleRender() {
if (rendering) return;
rendering = true;
requestAnimationFrame(() => {
answer += renderBuffer;
renderBuffer = "";
document.getElementById("answer").innerText = answer;
rendering = false;
});
}
function sendMessage() {
answer = "";
renderBuffer = "";
ws.send(JSON.stringify({
type: "user_message",
conversation_id: "conv_001",
message_id: "user_msg_001",
content: "请介绍一下 WebSocket 在大模型中的应用",
temperature: 0.8,
top_p: 0.9
}));
}
</script>
<button onclick="sendMessage()">发送</button>
这样前端不是每收到一个小片段就立即操作 DOM,而是按浏览器刷新节奏更新页面。
对于长回答、Markdown 渲染、代码块输出,这个优化非常有用。
九、Agent 场景下的 WebSocket 更有价值
大模型聊天只是 WebSocket 的基础用法。
真正到了 Agent 应用,WebSocket 的价值更明显。
比如一个投标文件分析 Agent,它可能要经过这些步骤:
1. 上传文件
2. 解析 PDF / Word
3. 提取招标要求
4. 检索评分标准
5. 分析废标风险
6. 生成技术标优化建议
如果没有 WebSocket,用户只能等。
有了 WebSocket,后端可以实时推送状态:
{
"type": "agent_step",
"step": "parse_file",
"message": "正在解析招标文件..."
}
{
"type": "agent_step",
"step": "extract_score",
"message": "已提取评分项,正在分析技术得分点..."
}
{
"type": "agent_step",
"step": "generate_report",
"message": "正在生成优化建议..."
}
最后再推送模型输出:
{
"type": "chunk",
"content": "根据招标文件要求,本项目技术标应重点关注..."
}
这样用户不会觉得系统“卡死了”,而是能看到任务正在推进。
这对大模型产品体验非常重要。
十、如何实现用户中途停止生成?
大模型流式输出还有一个常见需求:用户点击“停止生成”。
WebSocket 很适合做这个,因为前端可以随时往后端发控制消息:
{
"type": "stop",
"conversation_id": "conv_001",
"message_id": "assistant_msg_001"
}
后端收到后,把当前生成任务取消。
简化思路:
current_task = None
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
global current_task
await websocket.accept()
while True:
raw = await websocket.receive_text()
data = json.loads(raw)
if data["type"] == "user_message":
current_task = asyncio.create_task(handle_generation(websocket, data))
elif data["type"] == "stop":
if current_task:
current_task.cancel()
await websocket.send_text(json.dumps({
"type": "stopped"
}, ensure_ascii=False))
真实项目里不要用全局变量存任务,而是用:
user_id + conversation_id + message_id
来管理不同用户的任务。
比如:
running_tasks = {
"user_001:conv_001:assistant_msg_001": task
}
这样才能支持多用户并发。
十一、WebSocket 和 SSE 怎么选?
在大模型应用中,WebSocket 和 SSE 都可以实现流式输出。
简单来说:
只需要服务端向前端推模型结果:SSE 更简单
需要前端和后端双向通信:WebSocket 更合适
比如普通问答:
用户发问题
模型流式回答
SSE 就够了。
但如果你需要:
用户中途停止
Agent 实时状态
多轮任务控制
实时语音
多人聊天
后端主动推送通知
WebSocket 更合适。
对于复杂大模型应用,我更推荐 WebSocket,因为后面扩展空间更大。
十二、部署时的 Nginx 配置
如果你的 WebSocket 服务部署在 Nginx 后面,必须配置协议升级。
示例:
location /ws/ {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
最关键的是:
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
没有这两行,WebSocket 很容易连接失败。
另外,大模型生成时间可能比较长,所以要适当调大超时时间:
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
十三、多 worker 部署要小心模型显存
FastAPI 常见启动方式:
uvicorn app:app --host 0.0.0.0 --port 8000
如果你想提高并发,可能会想到:
uvicorn app:app --workers 4
但大模型服务要小心。
如果模型直接加载在 FastAPI 进程里,那么:
4 个 worker 可能会加载 4 份模型
显存可能直接爆掉
所以本地大模型服务建议分两种情况。
第一种,demo 阶段:
FastAPI 和模型放在一个进程
只开 1 个 worker
简单稳定
第二种,生产阶段:
FastAPI 只做 WebSocket 网关
模型单独作为推理服务
FastAPI 通过 HTTP / gRPC / 队列调用模型
推荐架构:
前端页面
↓ WebSocket
FastAPI 网关层
↓
模型推理服务
↓
GPU
这样 WebSocket 连接和模型推理解耦,后面扩容也更容易。
十四、真实项目里的完整架构
一个比较完整的大模型 WebSocket 架构可以这样设计:
前端
↓
WebSocket 连接
↓
FastAPI 网关层
↓
会话管理
↓
任务队列
↓
模型推理服务
↓
流式结果返回
↓
WebSocket 推送给前端
↓
保存聊天记录 / 任务日志
其中每一层的职责要分清楚。
前端负责:
展示聊天消息
追加流式内容
显示 Agent 状态
发送停止生成指令
处理错误提示
WebSocket 网关负责:
维护连接
解析消息协议
转发用户请求
推送模型输出
处理断开重连
模型服务负责:
加载模型
执行推理
输出 chunk
管理显存
控制并发
存储层负责:
保存用户问题
保存模型回答
保存参数
保存耗时
保存错误日志
不要把所有逻辑都堆在一个 WebSocket 函数里,否则后期会很难维护。
十五、常见坑总结
做大模型 WebSocket 时,常见坑主要有这些。
1. 模型推理阻塞事件循环
同步推理函数直接跑在 async 函数里,会影响其他连接。
解决方式:
用 asyncio.to_thread
或者把模型服务拆出去
2. 每个 token 都发送一次
这样消息太碎,高并发时性能差。
解决方式:
按 30-80ms 批量发送
或者累计几个 chunk 再发送
3. 不保存完整回答
只顾着推给前端,没有在后端拼接完整内容,最后无法保存聊天记录。
解决方式:
full_answer += chunk
生成结束后统一保存。
4. 用户断开后内容丢失
如果用户刷新页面,可能还没执行到保存逻辑。
解决方式:
try:
...
finally:
save_chat(prompt, full_answer)
5. Nginx 没配置 Upgrade
导致本地能连,部署后连不上。
解决方式:
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
6. 多 worker 加载多份模型
导致显存爆掉。
解决方式:
模型单独部署
FastAPI 只做网关
十六、结语
WebSocket 在大模型应用中,不只是一个“让文字流出来”的工具。
它真正解决的是大模型产品里的实时交互问题:
模型生成过程可见
Agent 执行状态可见
用户可以中途停止
后端可以主动推送
复杂任务体验更自然
对于简单聊天系统,WebSocket 可以实现流式输出;对于更复杂的 Agent 产品,WebSocket 可以把文件解析、检索、规划、调用工具、生成结果这些过程一步步展示给用户。
一个好的大模型 WebSocket 系统,重点不是会不会 send_text(),而是要设计好这几件事:
业务协议格式
流式输出拼接
异常和断开处理
chunk 批量发送
前端渲染优化
任务队列隔离
模型推理解耦
部署层长连接支持
最终可以总结成一句话:
WebSocket 负责实时连接,大模型负责内容生成,而真正决定产品体验的,是你如何把“生成过程”设计成一套稳定、清晰、可保存、可中断、可扩展的实时交互流程。