返回首页

SSE单向通信

布莱克2026-01-08 15:05已编辑
Tip:文章封面与内容无关,作者旅游时拍摄,因为没什么值得把四季都错过!

什么是SSE?

SSE (Server-Sent Events) 是一种基于 HTTP 协议的 单向推送 技术。它允许服务器在建立连接后,源源不断地向客户端发送数据,而不需要客户端重复发起请求

可以把它想象成一根**“单向透明的水管”**:

  • 客户端接好水桶(建立连接)。
  • 服务器在水管另一头“滴答、滴答”地往里注水(发送数据)。
  • 水桶满之前,连接一直不断
特性普通 HTTP (短连接)WebSocket (双向)SSE (单向流)
交互模式点餐制:你点一个菜,服务员上一个菜。对讲机制:两人可以随时互相说话。广播制/流水线:你坐那不动,厨师做完一个菜就端上来一个。
方向客户端 ->服务器客户端 <->服务器 (双工)服务器 -> 客户端 (单工)
协议标准 HTTP自定义协议 (基于 TCP)标准 HTTP
复杂度最简单最高(需要处理心跳、重连、协议转换)中等(轻量级)
断线重连不支持需手动实现浏览器原生支持 (retry 机制)

为什么AI对话偏爱 SSE 而不是 WebSocket?

轻量:SSE 直接跑在 HTTP 上,现有的 Nginx、防火墙、负载均衡都能轻松支持

场景匹配:AI 对话中,用户发一句话,AI 回五百个字。用户不需要在 AI 说话时插嘴,这种“一问多收”的场景,单向流完全够用

SSE的数据协议规范及应用场景

每一条消息由 key: value 组成,并以两个换行符 \n\n 结尾

常见的字段:

  • data: 消息内容。
  • event: 自定义事件名称(前端可以用 addEventListener 监听)。
  • id: 消息 ID(断线重连时,浏览器会自动带上 Last-Event-ID,服务器能从断点续传)。
  • retry: 告诉浏览器如果断开了,隔多少毫秒再重连。

应用场景:(除了我接触到的DeepSeek的AI调用外)

  • 实时金融走势:股票价格跳动;实时通知/看板:比如管理员后台的消息提醒、监控指标实时更新。
  • 日志流输出;动态更新:点赞数、评论数的实时跳动。
  • 流式传输连接如何结束:

    在SSE协议中,连接的结束通常有两种表现:一种是发送一个特殊的文本约定(如 [DONE]),另一种是物理连接的断开(end 事件)

    在DeepSeek的调用中,DeepSeek 的结束标识:[DONE],后端调用AI接口后代码如下(用express开发)

    if (payload === '[DONE]') {  // <--- 这里的 [DONE] 就是标识
        res.write('event: message\n');
        res.write('data: ' + JSON.stringify({ v: '[DONE]' }) + '\n\n');
        flush();
        continue;
    }

    物理层面的结束标识:end事件,文字发完,不代表TCP连接就断了,真正关门的动作如下:(express代码)

    res.end() 的作用
    它会向浏览器发送一个信号,表示“本响应已彻底完成”。此时,前端 Fetch 请求的 reader.read() 会返回 { done: true }。

    nodeStream.on('end', () => {
      clearInterval(heartbeat); // 停止心跳,不再发 ping
      try { res.end(); } catch {} // <--- 这才是真正的结束标识:关闭 HTTP 响应
    });

    避坑总结:(真实踩坑)

  • 中间件缓冲(Nginx/CDN):它们默认想攒够一大块数据再发。解决:proxy_buffering off。
  • 压缩算法(Gzip/Brotli):压缩需要等待一段内容集齐才能计算。解决:nginx设置 gzip off。
  • 浏览器连接数限制:在 HTTP/1.1 下,同一个域名浏览器只允许建立 6 个 SSE 连接。如果用户开了 6 个标签页,第 7 个就会卡死。解决:使用 HTTP/2
  • 前端解析碎包:TCP 传输不保证消息完整性。解决:建立 Buffer 机制。
  • 前端代码示例:(后端服务调用DeepSeek的API,前端处理逻辑)

       //前面为接口调用,内容省略,res为返回值
       if (!res.ok || !res.body) {
          // 添加限制,被重复频繁请求添加提示
          if (res.status === 429) {
            throw new Error('您访问的太频繁了,休息一会再来吧~')
          } else {
            throw new Error('网络错误')
          }
        }
        // 获取流式读取器
        const reader = res.body.getReader()
        // 初始化解码器,把二进制数组转换为UTF-8字符串
        const decoder = new TextDecoder('utf-8')
        //在对话列表末尾推入空的消息对象
        messages.value.push({ role: 'assistant', content: '', typing: true })
        // 获取新的消息引用,然后在下面循环中不断通过指针修改数据
        const assistantMsg = messages.value[messages.value.length - 1]
    
        //定义缓冲区,用于存放不完整的消息行
        //网络传输为了效率,会进行合并或拆分。
        //本地开发:网络太快,可能一个 value 刚好就是一个完整的 data: ...。
        //生产环境:由于网络波动,一个 value 可能包含半个消息,或者一下子包含了好几个消息
        let buffer = ''
        // 死循环,直到数据读完才跳出
        while (true) {
          // 从流中读取一小块原始数据,value是字节数组,done是结束标志
          const { value, done } = await reader.read()
          if (done) break //while循环结束
    
          // value是字节数组,必须经过decoder.decode()变为文字,才能拼接缓冲区
          buffer += decoder.decode(value, { stream: true })
    
          // 按双换行符分割,因为这是标准 SSE 消息的分界
          let parts = buffer.split(/\n\n/)
          
          // 关键:最后一部分可能不完整,存回 buffer 等待下一次 read 拼接
          buffer = parts.pop() || ''
          // 遍历这一批次中所有完整的消息块
          for (const part of parts) {
            const lines = part.split(/\n/)
            for (const line of lines) {
              const trimmedLine = line.trim()
              if (!trimmedLine || !trimmedLine.startsWith('data:')) continue
              // 截取"data:"后的内容,去掉空格
              const payload = trimmedLine.slice(5).trim()
              if (payload === '[DONE]') continue //结束标记
    
              try {
                const obj = JSON.parse(payload)
                //?? 从响应中提取文本内容,优先考虑 v 字段,其次是 choices[0].delta.content
                const seg = obj.v ?? obj?.choices?.[0]?.delta?.content ?? ''
    
                if (seg) {
                  // 收到第一个字时,关闭加载状态
                  if (loading.value) {
                    loading.value = false
                    stopLoading()
                  }
    
                  // 实时追加文字到vue的响应式变量中,页面自动蹦字
                  assistantMsg.content += seg
                  
                  // 自动滚动到底部,由于 Vue 更新 DOM 是异步的,需要 nextTick
                  nextTick(() => scrollToBottom())
                }
              } catch (e) {
                // 报错说明 JSON 不完整,但在有 buffer 的情况下这种情况极少发生
                console.warn('解析分片失败:', payload)
              }
            }
          }
        }


    assistant