SSE实现消息实时推送,前端渐进式学习、实践,真香

avatar
作者
筋斗云
阅读量:0

一、SSE概念

SSE(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式

二、SSE应用场景

在web端消息推送功能中,由于传统的HTTP协议是由客户端主动发起请求,服务端才会响应。基本的ajax轮询技术便是如此。而在SSE中,浏览发送一个请求给服务端,通过响应头中的Content-Type:text/event-stream等向客户端声名这是一个长连接,发送的是流数据,这样客户端就不会关闭连接,一直等待服务端发送数据。

如果服务器返回的数据中包含了事件标识符,浏览器会记录最后一次接收的事件的标识符。如果与服务器的连接中断,当浏览器再次进行连接时,会通过头来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继续连接

三、前端使用方法、问题

1、get方式

使用eventsource完成get请求

缺点:客户端无法通过一个get请求完成数据传递

参考文档:

EventSource - Web API 接口参考 | MDN

实现流程:

  1. 后端提供了两个接口,一个是:post,用以完成前端信息的传递,我这边是做大语言模型的,所以包括了模型必要参数、问题等;二、get接口,完成流式输出的接口,配置相应的具名事件、请求头等
  2. 前端通过调用post接口拿到本次会话id,将id携带在get请求里,完成信息传递
  3. 前端处理SSE流式返回

代码实现:

const eventSourceRef = useRef<any>(null) const contact = async (messageData: any) => {   eventSourceRef.current = new EventSource(     `${API_BASE}/v1/model/stream?id=${id}`,   )   if (!eventSourceRef.current) return   // 监听 SSE 事件,因为后端定义了具名事件,所以这儿要用addEventListener监听,而不是onmessage   eventSourceRef.current.addEventListener('add', function (e: any) { 	// 处理数据展示   })   eventSourceRef.current.addEventListener('finish', function (e: any) {    // 结束标识finish     eventSourceRef.current.close() // 关闭连接   })   eventSourceRef.current.addEventListener('error', function (e: any) {     if (e.status === 401) {       // 用户登录状态失效处理     }     // error报错处理     console.log('Error occurred:', e)     // 关闭连接     eventSourceRef.current.close()   }) } 

2、post方式

使用fetch-event-source完成连接,仅需一个接口,支持添加请求头

缺点:在浏览器返回的text/eventstream里看不到具体返回,无法进行预览

参考文档:

@microsoft/fetch-event-source

实现流程:

  1. 后端提供一个接口,支持前端传参、流式返回
  2. 前端通过fetch-event-source,完成传参、请求头添加等
  3. 处理返回数据

具体实现:

  const eventSourceRef = useRef<any>(null)   const [abortController, setAbortController] = useState(new AbortController())   // 通信事件   const contact = async (messageData: any) => {     messageData = { ...messageData, do_stream: modelArg.do_stream } // 请求参数     receivedDataRef.current = ''     const token: string = getLocal('AUTHCODE') || ''      fetchEventSource(`${MAAS_API_BASE}/v1/model_api/invoke`, {       method: 'POST',       // 添加请求头       headers: {         Authorization: token,         'Content-Type': 'application/json',       },         // 传参必须保证是json       body: JSON.stringify(messageData),       // abortController.signal 提供了一个信号对象给 fetchEventSource 函数。       // 如果在任何时候你想取消正在进行的 fetch 操作,你可以调用        // abortController.abort()。这会发出关联任务的信号,你可以使用        // AbortController 的信号来检查异步操作是否已被取消。       signal: abortController.signal,        openWhenHidden: true, // 切换标签页时连接不关闭       async onopen(resp) {         // 处理登录失效         if (resp.status === 401) {           message.warning('登录过期')           return         }       },       onmessage(msg: any) {         const eventType = msg.event // 监听event的具名事件         switch (eventType) {           case 'add':             // 流式输出事件,add每次会返回具体字符,前端负责拼接展示             break           case 'finish':             setStatu('finish') // 结束标识             break           case 'error':             if (msg.status === 401) {                message.warning('登录过期')             }             console.log('Error occurred:', e)             break         }       },       onerror(err) {         throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连       },       onclose() {},     })   }    // 终止连接方法,比如在切换模型时,你可能有必要终止上一次连接来避免问答串联   const closeSSE = () => {     abortController.abort()     setAbortController(new AbortController())   }

3、一种接口同时兼容流式/非流式

同上post方法

    fetchEventSource(sseUrl, {       method: 'POST',       headers,       signal: abortController.signal,       body: JSON.stringify(customInferData),       openWhenHidden: true,     /**     *在onopen阶段处理     第一步:判断resp.headers.get('content-type'),如果不包含text/event-stream,     则代表非流式     第二步:需要在onopen阶段处理非流式返回,即json返回,读取json返回并渲染,注意异常也要处理     第三步:      */       async onopen(resp) {         const contentype = resp.headers.get('content-type') || ''         console.log('contentype =>', contentype)         console.log('resp.ok =>', resp.ok)         if (resp.ok && !contentype.includes('text/event-stream')) {           // 读取json数据           const responseData = await resp.json()           if (responseData.code !== 0) {              // 报错处理+关闭连接           } else {             //处理数据渲染+关闭连接                          stopSession()           }         } else if (resp.status === 401) {           message.warning('登录过期')           // 报错处理+关闭连接           stopSession()                  }       },       onmessage(msg: any) {         const eventType = msg.event         const messages: any = cloneDeep(chatState.sessionMessages)         let lastMessage: any = messages[messages.length - 1] || {}          switch (eventType) {           case 'add':             lastMessage = {               ...lastMessage,               text: `${lastMessage.text}${msg.data || ' '}`,               loading: false,             }             messages.splice(messages.length - 1, 1, lastMessage)             chatAction.updateSessionMessages(messages)             break           case 'finish':             console.log('finish lastMessage =>', lastMessage)             chatAction.updateSessionStatu(SessionStatuTypes.ready)             chatAction.updateContext(msg.data)             break           case 'info':             {               const messages: any = cloneDeep(chatState.sessionMessages)               let lastMessage: any = messages[messages.length - 1] || {}               lastMessage = {                 referenceDocs: JSON.parse(msg.data).reference_by_docs,                 ...lastMessage,               }               messages.splice(messages.length - 1, 1, lastMessage)               chatAction.updateSessionMessages(messages)             }             break           case 'error':             if (msg.status === 401) {               chatAction.updateSessionStatu(SessionStatuTypes.ready)             } else {               errorItemFn(msg?.msg || msg?.data || '抱歉,暂无法回答问题')             }             break         }       },       onerror(err: any) {         errorItemFn(err?.msg || '抱歉,暂无法回答该问题')         console.log('eventSource error: ', `${err}`)         throw err  // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连       },       onclose() {         console.log('eventSource close')       },     })    // 终止会话   const stopSession = () => {     abortController.abort()     setAbortController(new AbortController())   }

四、常见问题汇总

1、无法添加请求头

应用fetch-event-source解决

2、一个方法需要同时兼容流式和非流式

应用fetch-event-source在onopen阶段处理非流式输出,如报错、接口json返回等

3、遇到跨域时候,请求一直连接

应用fetch-event-source在监听具名事件时,如error,将错误throw err,否则无法中断连接

4、使用SSE时,前端引入markdown渲染时发现后端response data里的前置空格被忽略

服务器响应状态码应该为 200,header 为 Content-Type: text/event-stream,然后保持此连接并以一种特殊的格式写入消息,就像这样:

data: Message 1  data: Message 2  data: Message 3 data: of two lines

data: 后为消息文本,冒号后面的空格是可选的

这个可选便是主要原因。查询社区后,找到了两种解决办法:

  • 第一种:data后面的用json字符串返回,前端接收时转成对象拿到里面的content渲染。
    ​​​​​​​如:data: {"content":"   1234"}
  • 第二种:使用开源社区的@echofly/fetch-event-source插件替换fetch-event-source,微软的这个插件已经很久没维护了,所以对这个问题还没解决。使用方法和fetch-event-source一木一样,只需要更改引入即可。
    参考链接:@echofly/fetch-event-source - npm
  • 第三种失败。考虑把空格替换为转义字符,前端无需加任何处理可以直接显示。空格的问题是解决了,但是对代码块等特殊输出带来了影响,使其空格无法展示。

具体表现,点击放大查看:

5、fetch方法如何终止

  const stopSession = () => {     abortController.abort()     setAbortController(new AbortController())   }

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!