SSE (Server-Sent Events) 是一种基于 HTTP 协议的 单向推送 技术。它允许服务器在建立连接后,源源不断地向客户端发送数据,而不需要客户端重复发起请求
可以把它想象成一根**“单向透明的水管”**:
| 特性 | 普通 HTTP (短连接) | WebSocket (双向) | SSE (单向流) |
| 交互模式 | 点餐制:你点一个菜,服务员上一个菜。 | 对讲机制:两人可以随时互相说话。 | 广播制/流水线:你坐那不动,厨师做完一个菜就端上来一个。 |
| 方向 | 客户端 ->服务器 | 客户端 <->服务器 (双工) | 服务器 -> 客户端 (单工) |
| 协议 | 标准 HTTP | 自定义协议 (基于 TCP) | 标准 HTTP |
| 复杂度 | 最简单 | 最高(需要处理心跳、重连、协议转换) | 中等(轻量级) |
| 断线重连 | 不支持 | 需手动实现 | 浏览器原生支持 (retry 机制) |
轻量:SSE 直接跑在 HTTP 上,现有的 Nginx、防火墙、负载均衡都能轻松支持
场景匹配:AI 对话中,用户发一句话,AI 回五百个字。用户不需要在 AI 说话时插嘴,这种“一问多收”的场景,单向流完全够用
每一条消息由 key: value 组成,并以两个换行符 \n\n 结尾
应用场景:(除了我接触到的DeepSeek的AI调用外)
在SSE协议中,连接的结束通常有两种表现:一种是发送一个特殊的文本约定(如 [DONE]),另一种是物理连接的断开(end 事件)
在DeepSeek的调用中,DeepSeek 的结束标识:[DONE],后端调用AI接口后代码如下(用express开发)
if (payload === '[DONE]') { // <--- 这里的 [DONE] 就是标识
res.write('event: message\n');
res.write('data: ' + JSON.stringify({ v: '[DONE]' }) + '\n\n');
flush();
continue;
}物理层面的结束标识:end事件,文字发完,不代表TCP连接就断了,真正关门的动作如下:(express代码)
res.end() 的作用:
它会向浏览器发送一个信号,表示“本响应已彻底完成”。此时,前端 Fetch 请求的 reader.read() 会返回 { done: true }。
nodeStream.on('end', () => {
clearInterval(heartbeat); // 停止心跳,不再发 ping
try { res.end(); } catch {} // <--- 这才是真正的结束标识:关闭 HTTP 响应
});前端代码示例:(后端服务调用DeepSeek的API,前端处理逻辑)
//前面为接口调用,内容省略,res为返回值
if (!res.ok || !res.body) {
// 添加限制,被重复频繁请求添加提示
if (res.status === 429) {
throw new Error('您访问的太频繁了,休息一会再来吧~')
} else {
throw new Error('网络错误')
}
}
// 获取流式读取器
const reader = res.body.getReader()
// 初始化解码器,把二进制数组转换为UTF-8字符串
const decoder = new TextDecoder('utf-8')
//在对话列表末尾推入空的消息对象
messages.value.push({ role: 'assistant', content: '', typing: true })
// 获取新的消息引用,然后在下面循环中不断通过指针修改数据
const assistantMsg = messages.value[messages.value.length - 1]
//定义缓冲区,用于存放不完整的消息行
//网络传输为了效率,会进行合并或拆分。
//本地开发:网络太快,可能一个 value 刚好就是一个完整的 data: ...。
//生产环境:由于网络波动,一个 value 可能包含半个消息,或者一下子包含了好几个消息
let buffer = ''
// 死循环,直到数据读完才跳出
while (true) {
// 从流中读取一小块原始数据,value是字节数组,done是结束标志
const { value, done } = await reader.read()
if (done) break //while循环结束
// value是字节数组,必须经过decoder.decode()变为文字,才能拼接缓冲区
buffer += decoder.decode(value, { stream: true })
// 按双换行符分割,因为这是标准 SSE 消息的分界
let parts = buffer.split(/\n\n/)
// 关键:最后一部分可能不完整,存回 buffer 等待下一次 read 拼接
buffer = parts.pop() || ''
// 遍历这一批次中所有完整的消息块
for (const part of parts) {
const lines = part.split(/\n/)
for (const line of lines) {
const trimmedLine = line.trim()
if (!trimmedLine || !trimmedLine.startsWith('data:')) continue
// 截取"data:"后的内容,去掉空格
const payload = trimmedLine.slice(5).trim()
if (payload === '[DONE]') continue //结束标记
try {
const obj = JSON.parse(payload)
//?? 从响应中提取文本内容,优先考虑 v 字段,其次是 choices[0].delta.content
const seg = obj.v ?? obj?.choices?.[0]?.delta?.content ?? ''
if (seg) {
// 收到第一个字时,关闭加载状态
if (loading.value) {
loading.value = false
stopLoading()
}
// 实时追加文字到vue的响应式变量中,页面自动蹦字
assistantMsg.content += seg
// 自动滚动到底部,由于 Vue 更新 DOM 是异步的,需要 nextTick
nextTick(() => scrollToBottom())
}
} catch (e) {
// 报错说明 JSON 不完整,但在有 buffer 的情况下这种情况极少发生
console.warn('解析分片失败:', payload)
}
}
}
}