springboot -sse -flux 服务器推送消息

avatar
作者
猴君
阅读量:4

先说BUG处理,遇到提示异步问题Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "<async-supported>true</async-supported>" to servlet and filter declarations in web.xml.

springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true

springmvc在web.xml处理

<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"          version="3.0">  <filter-mapping>   <filter-name>shiroFilter</filter-name>   <url-pattern>/*</url-pattern>   <dispatcher>REQUEST</dispatcher>   <dispatcher>ASYNC</dispatcher> </filter-mapping>
  • demo1,服务器间隔一定时间推送内容
  1.     接口方法
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE ) 	public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {         // 每两秒推送一次         return Flux.interval(Duration.ofSeconds(2)).map(seq->             Tuples.of(seq, LocalDateTime.now())).log()//序号和时间                 .map(data-> ServerSentEvent.<String>builder().id(userId).data(data.getT1().toString()).build());//推送内容              }

2.前端代码

<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head>     <meta charset="UTF-8"/>     <title>服务器推送事件</title> </head> <body> <div>         <div id="data"></div>         <div id="result"></div><br/> </div> <script th:inline="javascript" > //服务器推送事件 if (typeof (EventSource) !== "undefined") {      var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1");     //当抓取到消息时     source1.onmessage = function (evt) {         document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data;     }; } else {     //注意:ie浏览器不支持     document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";       var xhr;     var xhr2;     if (window.XMLHttpRequest){         //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法         xhr=new XMLHttpRequest();         xhr2=new XMLHttpRequest();     }else{         //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替         xhr=new ActiveXObject("Microsoft.XMLHTTP");         xhr2=new ActiveXObject("Microsoft.XMLHTTP");     }     console.log(xhr);     console.log(xhr2);     xhr.open('GET', '/sse/countDown');     xhr.send(null);//发送请求     xhr.onreadystatechange = function() {         console.log("s响应状态:" + xhr.readyState);         //2是空响应,3是响应一部分,4是响应完成         if (xhr.readyState > 2) {             //这儿可以使用response(对应json)与responseText(对应text)             var newData = xhr.response.substr(xhr.seenBytes);             newData = newData.replace(/\n/g, "#");             newData = newData.substring(0, newData.length - 1);             var data = newData.split("#");             console.log("获取到的数据:" + data);             document.getElementById("result").innerHTML = data;             //长度重新赋值,下次截取时需要使用             xhr.seenBytes = xhr.response.length;         }     }              xhr2.open('GET', '/sse/retrieve');     xhr2.send(null);//发送请求     xhr2.onreadystatechange = function() {         console.log("s响应状态:" + xhr2.readyState);         //0: 请求未初始化,2 请求已接收,3 请求处理中,4  请求已完成,且响应已就绪         if (xhr2.readyState > 2) {             //这儿可以使用response(对应json)与responseText(对应text)             var newData1 = xhr2.response.substr(xhr2.seenBytes);             newData1 = newData1.replace(/\n/g, "#");             newData1 = newData1.substring(0, newData1.length - 1);             var data1 = newData1.split("#");             console.log("获取到的数据:" + data1);             document.getElementById("data").innerHTML = data1;             //长度重新赋值,下次截取时需要使用             xhr2.seenBytes = xhr2.response.length;         }     } } </script> </body> </html>
  • demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close

1.接口方法以及工具类

@GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE ) public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) { 		// 简单异步发消息 ====         //questionId 订阅id,id对应了sse对象         new Thread(() -> {             try {                 Thread.sleep(1000);                 for (int i = 0; i < 10; i++) {                     Thread.sleep(500);                     SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);                 }             } catch (Exception e) {                 e.printStackTrace();             } finally {                 // 消息发送完关闭订阅                 SSEUtils.closeSub(questionId);             }         }).start();         // =================          return SSEUtils.addSub(questionId);     }

工具类

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;  public class SSEUtils {     // timeout     private static Long DEFAULT_TIME_OUT = 2*60*1000L;     // 订阅表     private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();      /** 添加订阅 */     public static SseEmitter addSub(String questionId) {         if (null == questionId || "".equals(questionId)) {             return null;         }          SseEmitter emitter = subscribeMap.get(questionId);         if (null == emitter) {             emitter = new SseEmitter(DEFAULT_TIME_OUT);             subscribeMap.put(questionId, emitter);         }          return emitter;     }      /** 发消息 */     public static void pubMsg(String questionId, String msg) {         SseEmitter emitter = subscribeMap.get(questionId);         if (null != emitter) {             try {                 // 更规范的消息结构看源码                 emitter.send(SseEmitter.event().data(msg));             } catch (Exception e) {                 // e.printStackTrace();             }         }     }      /**      * 关闭订阅       * @param questionId      */     public static void closeSub(String questionId) {         SseEmitter emitter = subscribeMap.get(questionId);         if (null != emitter) {             try {                 emitter.complete();                 subscribeMap.remove(questionId);             } catch (Exception e) {                 e.printStackTrace();             }         }     } } 

2.前端代码

<!DOCTYPE html> <html lang="en"> <head>     <meta charset="UTF-8">     <title>sse</title> </head> <body> <div>     <label>问题id</label>     <input type="text" id="questionId">     <button onclick="subscribe()">订阅</button>     <hr>     <label>F12-console控制台查看消息</label> </div>   <script>     function subscribe() {         let questionId = document.getElementById('questionId').value;         let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId;                  let eventSource = new EventSource(url);         eventSource.onmessage = function (e) {             console.log(e.data);         };          eventSource.onopen = function (e) { 			 console.log(e,1);             // todo         };          eventSource.onerror = function (e) {             // todo 						 console.log(e,2);             eventSource.close()         };     } </script> </body> </html>

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!