简介
WebSocket 是基于TCP/IP协议,独立于HTTP协议的通信协议。WebSocket 连接允许客户端和服务器之间的全双工通信,以便任何一方都可以通过已建立的连接将数据推送到另一方。
我们常用的HTTP是客户端通过「请求-响应」的方式与服务器建立通信的,必须是客户端主动触发的行为,服务端只是做好接口被动等待请求。而在某些场景下的动作,是需要服务端主动触发的,比如向客户端发送消息、实时通讯、远程控制等。客户端是不知道这些动作几时触发的,假如用HTTP的方式,那么设备端需要不断轮询服务端,这样的方式对服务器压力太大,同时产生很多无效请求,且具有延迟性。于是才采用可以建立双向通讯的长连接协议。通过握手建立连接后,服务端可以实时发送数据与指令到设备端,服务器压力小。
Spring WebSocket是Spring框架的一部分,提供了在Web应用程序中实现实时双向通信的能力。本教程将引导你通过一个简单的例子,演示如何使用Spring WebSocket建立一个实时通信应用。
准备工作
确保你的项目中已经引入了Spring框架的WebSocket模块。你可以通过Maven添加以下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
创建WebSocket配置类(实现WebSocketConfigurer接口)
首先,创建一个配置类,用于配置WebSocket的相关设置。
package com.ci.erp.human.config; import com.ci.erp.human.handler.WebSocketHandler; import com.ci.erp.human.interceptor.WebSocketHandleInterceptor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; /** * * Websocket配置类 * * @author lucky_fd * @since 2024-01-17 */ @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // 注册websocket处理器和拦截器 registry.addHandler(webSocketHandler(), "/websocket/server") .addInterceptors(webSocketHandleInterceptor()).setAllowedOrigins("*"); registry.addHandler(webSocketHandler(), "/sockjs/server").setAllowedOrigins("*") .addInterceptors(webSocketHandleInterceptor()).withSockJS(); } @Bean public WebSocketHandler webSocketHandler() { return new WebSocketHandler(); } @Bean public WebSocketHandleInterceptor webSocketHandleInterceptor() { return new WebSocketHandleInterceptor(); } }
上面的配置类使用@EnableWebSocket注解启用WebSocket,并通过registerWebSocketHandlers方法注册WebSocket处理器。
registerWebSocketHandlers:这个方法是向spring容器注册一个handler处理器及对应映射地址,可以理解成MVC的Handler(控制器方法),websocket客户端通过请求的url查找处理器进行处理
addInterceptors:拦截器,当建立websocket连接的时候,我们可以通过继承spring的HttpSessionHandshakeInterceptor来做一些事情。
setAllowedOrigins:跨域设置,
*
表示所有域名都可以,不限制, 域包括ip:port, 指定*
可以是任意的域名,不加的话默认localhost+本服务端口withSockJS: 这个是应对浏览器不支持websocket协议的时候降级为轮询的处理。
创建WebSocket消息处理器(实现TextWebSocketHandler 接口)
接下来,创建一个消息处理器,处理客户端发送的消息。
package com.ci.erp.human.handler; import cn.hutool.core.util.ObjectUtil; import com.ci.erp.common.core.utils.JsonUtils; import com.ci.erp.human.domain.thirdVo.YYHeartbeat; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * * websocket处理类 * 实现WebSocketHandler接口 * * - websocket建立连接后执行afterConnectionEstablished回调接口 * - websocket关闭连接后执行afterConnectionClosed回调接口 * - websocket接收客户端消息执行handleTextMessage接口 * - websocket传输异常时执行handleTransportError接口 * * @author lucky_fd * @since 2024-01-17 */ public class WebSocketHandler extends TextWebSocketHandler { /** * 存储websocket客户端连接 * */ private static final Map<String, WebSocketSession> connections = new HashMap<>(); /** * 建立连接后触发 * */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("成功建立websocket连接"); // 建立连接后将连接以键值对方式存储,便于后期向客户端发送消息 // 以客户端连接的唯一标识为key,可以通过客户端发送唯一标识 connections.put(session.getRemoteAddress().getHostName(), session); System.out.println("当前客户端连接数:" + connections.size()); } /** * 接收消息 * */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { System.out.println("收到消息: " + message.getPayload()); // 收到客户端请求消息后进行相应业务处理,返回结果 this.sendMessage(session.getRemoteAddress().getHostName(),new TextMessage("收到消息: " + message.getPayload())); } /** * 传输异常处理 * */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { super.handleTransportError(session, exception); } /** * 关闭连接时触发 * */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { System.out.println("触发关闭websocket连接"); // 移除连接 connections.remove(session.getRemoteAddress().getHostName()); } @Override public boolean supportsPartialMessages() { return super.supportsPartialMessages(); } /** * 向连接的客户端发送消息 * * @author lucky_fd * @param clientId 客户端标识 * @param message 消息体 **/ public void sendMessage(String clientId, TextMessage message) { for (String client : connections.keySet()) { if (client.equals(clientId)) { try { WebSocketSession session = connections.get(client); // 判断连接是否正常 if (session.isOpen()) { session.sendMessage(message); } } catch (IOException e) { System.out.println(e.getMessage()); } break; } } } }
通过消息处理器,在开发中我们就可以实现向指定客户端或所有客户端发送消息,实现相应业务功能。
创建拦截器
拦截器会在握手时触发,可以用来进行权限验证
package com.ci.erp.human.interceptor; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; import java.util.Map; /** * * Websocket拦截器类 * * @author lucky_fd * @since 2024-01-17 */ public class WebSocketHandleInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { System.out.println("拦截器前置触发"); return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { System.out.println("拦截器后置触发"); super.afterHandshake(request, response, wsHandler, ex); } }
创建测试客户端
前端页面客户端
最后,创建一个简单的HTML页面,用于接收用户输入并显示实时聊天信息。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Spring WebSocket Chat</title> <script src="https://code.jquery.com/jquery-3.6.4.min.js"></script> <script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.js"></script> </head> <body> 请输入:<input type="text" id="message" placeholder="Type your message"> <button onclick="sendMessage()">Send</button> <button onclick="websocketClose()">关闭连接</button> <div id="chat"></div> <script> var socket = null; if ('WebSocket' in window) { // 后端服务port为22900 socket = new WebSocket("ws://localhost:22900/websocket/server"); } else if ('MozWebSocket' in window) { socket = new MozWebSocket("ws://localhost:22900/websocket/server"); } else { socket = new SockJS("http://localhost:22900/sockjs/server"); } // 接收消息触发 socket.onmessage = function (event) { showMessage(event.data); }; // 创建连接触发 socket.onopen = function (event) { console.log(event.type); }; // 连接异常触发 socket.onerror = function (event) { console.log(event) }; // 关闭连接触发 socket.onclose = function (closeEvent) { console.log(closeEvent.reason); }; //发送消息 function sendMessage() { if (socket.readyState === socket.OPEN) { var message = document.getElementById('message').value; socket.send(message); console.log("发送成功!"); } else { console.log("连接失败!"); } } function showMessage(message) { document.getElementById('chat').innerHTML += '<p>' + message + '</p>'; } function websocketClose() { socket.close(); console.log("连接关闭"); } window.close = function () { socket.onclose(); }; </script> </body> </html>
这个页面使用了WebSocket对象来建立连接,并通过onmessage监听收到的消息。通过输入框发送消息,将会在页面上显示。
测试结果:
后端日志:
前端界面:
Java客户端
添加依赖
<dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.4.0</version> </dependency>
创建客户端类(继承WebsocketClient)
package com.river.websocket; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; public class MyWebSocketClient extends WebSocketClient { MyWebSocketClient(String url) throws URISyntaxException { super(new URI(url)); } // 建立连接 @Override public void onOpen(ServerHandshake shake) { System.out.println(shake.getHttpStatusMessage()); } // 接收消息 @Override public void onMessage(String paramString) { System.out.println(paramString); } // 关闭连接 @Override public void onClose(int paramInt, String paramString, boolean paramBoolean) { System.out.println("关闭"); } // 连接异常 @Override public void onError(Exception e) { System.out.println("发生错误"); } }
测试websocket
package com.river.websocket; import org.java_websocket.enums.ReadyState; import java.net.URISyntaxException; /** * @author lucky_fd * @date 2024-1-17 */ public class Client { public static void main(String[] args) throws URISyntaxException, InterruptedException { MyWebSocketClient client = new MyWebSocketClient("ws://localhost:22900/websocket/server"); client.connect(); while (client.getReadyState() != ReadyState.OPEN) { System.out.println("连接状态:" + client.getReadyState()); Thread.sleep(100); } client.send("测试数据!"); client.close(); } }
进阶实战应用
Websocket消息处理器
基于事件订阅机制实现消息的处理,使用Spring事件机制实现消息处理
/** * * websocket处理类 * 实现WebSocketHandler接口 * * @author lucky_fd * @since 2024-01-17 */ public class WebSocketHandler extends TextWebSocketHandler { /** * 使用ConcurrentHashMap存放ws连接 * */ private static final Map<String, WebSocketSession> connections = new ConcurrentHashMap<>(); /** * 存放当前连接ip及名称 * */ public static final Map<String, String> ipMap = new ConcurrentHashMap<>(); @Autowired private ApplicationEventPublisher publisher; @Autowired private RedisService redisService; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("建立连接"); String host = IpUtil.getIpAddr(session); connections.put(host, session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { // 获取消息并转换为实体类 WSRespVo respVo = JsonUtils.parseObject(message.getPayload(), WSRespVo.class); System.out.println("收到消息: " + respVo); // 发布事件消息 publisher.publishEvent(new YYWebsocketRespEvent(respVo, session)); // this.sendMessage(session.getRemoteAddress().getHostName(),new TextMessage("收到消息: " + message.getPayload())); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { super.handleTransportError(session, exception); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { // 移除连接 String host = IpUtil.getIpAddr(session); connections.remove(host); // 删除在线列表 redisService.deleteCacheMapByKey(CacheConstants.HUMAN_YY_CLOCK_ONLINE, ipMap.get(host)); System.out.println("连接关闭:" + host); } @Override public boolean supportsPartialMessages() { return super.supportsPartialMessages(); } /** * 向连接的客户端发送消息 * * @author lucky_fd * @param clientId 客户端标识 * @param message 消息体 **/ public void sendMessage(String clientId, TextMessage message) { for (String client : WebSocketHandler.connections.keySet()) { if (client.equals(clientId)) { try { WebSocketSession session = WebSocketHandler.connections.get(client); // 判断连接是否正常 if (session.isOpen()) { session.sendMessage(message); } } catch (IOException e) { System.out.println(e.getMessage()); } break; } } } /** * 向连接的所有客户端发送消息 * * @author lucky_fd * @param message 消息体 * @return int 成功发送的客户端数量 **/ public int sendMessageEvery(TextMessage message) { int count = 0; for (WebSocketSession session : WebSocketHandler.connections.values()) { // 判断连接是否正常 if (session.isOpen()) { try { session.sendMessage(message); count++; } catch (IOException ignored) { } } } return count; } }
定义事件类
/** * * websocket响应事件 * @author lucky_fd * @since 2024-03-06 */ @Getter public class YYWebsocketRespEvent extends ApplicationEvent { /** * 响应指令 * @see EmWSRespCmd * */ private final String cmd; private final WebSocketSession session; public YYWebsocketRespEvent(WSRespVo source, WebSocketSession session) { super(source); this.cmd = source.getCmd(); this.session = session; } }
定义事件监听器
/** * websocket响应事件监听 * * @author lucky_fd * @since 2024-03-06 */ @Component public class YYWebsocketRespEventListener { @Autowired private WebSocketHandler webSocketHandler; // 注入消息处理器 @Autowired private RedisService redisService; /** * 客户端声明事件 * 设备连接上 websocket 后调用该接口 * * @param respEvent 事件消息 * @author lucky_fd **/ @EventListener(condition = "#respEvent.cmd == 'declare'") public void declare(YYWebsocketRespEvent respEvent) { // 根据不同的事件实现自定义业务处理 WSRespVo source = (WSRespVo) respEvent.getSource(); // 博主这里是将连接的客户端存放到redis,以便后期的业务处理 redisService.setCacheMapValue(CacheConstants.HUMAN_YY_CLOCK_ONLINE, source.getSn(), source); WebSocketHandler.ipMap.put(source.getIp(), source.getSn()); } /** * 客户端心跳事件 * * @param respEvent 事件消息 * @author lucky_fd **/ @EventListener(condition = "#respEvent.cmd == 'ping'") public void ping(YYWebsocketRespEvent respEvent) { // 根据不同的事件实现自定义业务处理 String ip = IpUtil.getIpAddr(respEvent.getSession()); String pong = JsonUtils.toJsonString(new YYHeartbeat("pong")); // 发送心跳响应 webSocketHandler.sendMessage(ip, new TextMessage(pong)); } /** * 客户端响应指令 * * @param respEvent 事件消息 * @author lucky_fd **/ @EventListener(condition = "#respEvent.cmd == 'to_device'") public void toDevice(YYWebsocketRespEvent respEvent) { WSRespVo source = (WSRespVo) respEvent.getSource(); System.out.println("客户端响应指令: " + JsonUtils.toJsonString(source)); } }
Nginx转发websocket配置
server { # 日志记录格式 log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"' 'upstreamIP: $upstream_addr' 'upgrade: $http_upgrade'; listen 80; # 监听端口 server_name localhost; location ^~ /WS/ { proxy_pass http://192.168.20.2:8080; # 代理到服务网关 proxy_set_header Upgrade $http_upgrade; # proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; access_log logs/erp/access.log main; # 记录解析日志 error_log logs/erp/error.log; # 记录错误日志 } }
Gateway网关转发websocket配置
Spring Cloud Gateway的路由配置
- id: websocket #uri: ws://127.0.0.1:22900 #使用方式1:websocket配置,直接地址 uri: lb:ws://ci-erp-human #使用方式2:websocket配置,通过nacos注册中心调用serviceName predicates: # 断言工厂 - Path=/WS/** # 这是一个路径断言,表示只有当请求的路径以/WS/开头时,这个路由才会被匹配。**是一个通配符,表示匹配任意子路径。 filters: # 过滤器 - RewritePath=/WS/(?<segment>.*),/$\{segment} # 这是一个路径重写过滤器。它将匹配到的/WS/路径下的任意子路径(由正则表达式的(?<segment>.*)捕获)重写为仅包含该子路径(由$\{segment}引用捕获的组)。例如,如果请求的路径是/WS/some/path,那么重写后的路径将是/some/path
- Gateway转发消息体过大会导致websocket连接断开问题解决:SpringCloud Gateway转发Websocket并修改消息体大小限制
参考链接:
- 通过注解方式实现websocket:springboot集成websocket持久连接(权限过滤+拦截)
- spring websocket实现前后端通信