SpringBoot 整合WebSocket 简单实战案例
前言
这个简单实战案例主要目的是让大家了解websocket的一些简单使用.
另外使用stomp方式的:
-
《Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)》
https://blog.csdn.net/qq_35387940/article/details/119817167 -
《Springboot 整合 WebSocket ,使用STOMP协议+Redis 解决负载场景问题(二)》
https://blog.csdn.net/qq_35387940/article/details/120068362 -
想稍微再深入一下,可以看这篇,
《Springboot 整合Websocket,Stomp协议,使用rabbitmq做消息代理,消息缓存》:https://blog.csdn.net/qq_35387940/article/details/108276136
但是如果你是第一次尝试整合websocket,我还是建议你把当前这篇看一看,跟着做下实战案例。
正文
先看看这次实践的目录结构:
两个页面分别模拟不同用户接入websocket。
------接下来,我们开始整合WebSocket------
先是pom.xml添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
PS:application.properties不需要添加任何配置 ,我只设置了一下服务server.port=8083
接着,创建节点配置类WebSocketStompConfig.java:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketStompConfig {
//这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket ,如果你使用外置的tomcat就不需要该配置文件
@Bean
public ServerEndpointExporter serverEndpointExporter()
{
return new ServerEndpointExporter();
}
}
然后是WebSocket配置类,WebSocket.java:
(这里面包含这单独发送消息,群发,监听上下线等等方法)
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* @Author:JCccc
* @Description:
* @Date: created in 15:56 2019/5/13
*/
@Component
@ServerEndpoint(value = "/connectWebSocket/{userId}")
public class WebSocket {
private Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 在线人数
*/
public static int onlineNumber = 0;
/**
* 以用户的姓名为key,WebSocket为对象保存起来
*/
private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
/**
* 会话
*/
private Session session;
/**
* 用户名称
*/
private String userId;
/**
* 建立连接
*
* @param session
*/
@OnOpen
public void onOpen(@PathParam("userId") String userId, Session session)
{
onlineNumber++;
logger.info("现在来连接的客户id:"+session.getId()+"用户名:"+userId);
this.userId = userId;
this.session = session;
// logger.info("有新连接加入! 当前在线人数" + onlineNumber);
try {
//messageType 1代表上线 2代表下线 3代表在线名单 4代表普通消息
//先给所有人发送通知,说我上线了
Map<String,Object> map1 = Maps.newHashMap();
map1.put("messageType",1);
map1.put("userId",userId);
sendMessageAll(JSON.toJSONString(map1),userId);
//把自己的信息加入到map当中去
clients.put(userId, this);
logger.info("有连接关闭! 当前在线人数" + clients.size());
//给自己发一条消息:告诉自己现在都有谁在线
Map<String,Object> map2 = Maps.newHashMap();
map2.put("messageType",3);
//移除掉自己
Set<String> set = clients.keySet();
map2.put("onlineUsers",set);
sendMessageTo(JSON.toJSONString(map2),userId);
}
catch (IOException e){
logger.info(userId+"上线的时候通知所有人发生了错误");
}
}
@OnError
public void onError(Session session, Throwable error) {
logger.info("服务端发生了错误"+error.getMessage());
//error.printStackTrace();
}
/**
* 连接关闭
*/
@OnClose
public void onClose()
{
onlineNumber--;
//webSockets.remove(this);
clients.remove(userId);
try {
//messageType 1代表上线 2代表下线 3代表在线名单 4代表普通消息
Map<String,Object> map1 = Maps.newHashMap();
map1.put("messageType",2);
map1.put("onlineUsers",clients.keySet());
map1.put("userId",userId);
sendMessageAll(JSON.toJSONString(map1),userId);
}
catch (IOException e){
logger.info(userId+"下线的时候通知所有人发生了错误");
}
//logger.info("有连接关闭! 当前在线人数" + onlineNumber);
logger.info("有连接关闭! 当前在线人数" + clients.size());
}
/**
* 收到客户端的消息
*
* @param message 消息
* @param session 会话
*/
@OnMessage
public void onMessage(String message, Session session)
{
try {
logger.info("来自客户端消息:" + message+"客户端的id是:"+session.getId());
System.out.println("------------ :"+message);
JSONObject jsonObject = JSON.parseObject(message);
String textMessage = jsonObject.getString("message");
String fromuserId = jsonObject.getString("userId");
String touserId = jsonObject.getString("to");
//如果不是发给所有,那么就发给某一个人
//messageType 1代表上线 2代表下线 3代表在线名单 4代表普通消息
Map<String,Object> map1 = Maps.newHashMap();
map1.put("messageType",4);
map1.put("textMessage",textMessage);
map1.put("fromuserId",fromuserId);
if(touserId.equals("All")){
map1.put("touserId","所有人");
sendMessageAll(JSON.toJSONString(map1),fromuserId);
}
else{
map1.put("touserId",touserId);
System.out.println("开始推送消息给"+touserId);
sendMessageTo(JSON.toJSONString(map1),touserId);
}
}
catch (Exception e){
e.printStackTrace();
logger.info("发生了错误了");
}
}
public void sendMessageTo(String message, String TouserId) throws IOException {
for (WebSocket item : clients.values()) {
// System.out.println("在线人员名单 :"+item.userId.toString());
if (item.userId.equals(TouserId) ) {
item.session.getAsyncRemote().sendText(message);
break;
}
}
}
public void sendMessageAll(String message,String FromuserId) throws IOException {
for (WebSocket item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
public static synchronized int getOnlineCount() {
return onlineNumber;
}
}
接下来用一个HTML5 页面 index.html,连接当前的WebSocket节点,接/发消息, index.html:
<!DOCTYPE HTML>
<html>
<head>
<title>Test My WebSocket</title>
</head>
<body>
TestWebSocket
<input id="text" type="text" />
<button onclick="send()">SEND MESSAGE</button>
<button onclick="closeWebSocket()">CLOSE</button>
<div id="message"></div>
</body>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
//连接WebSocket节点
websocket = new WebSocket("ws://localhost:8083/connectWebSocket/001");
}
else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
为了演示多人接入,我们再弄一个index2.html:
好了,一切准备就绪,那么 把项目跑起来:
访问index.html模拟用户001连接websocket服务:
可以看到一上线,我们默认就推送了一条上线消息
接下来继续访问index2.html,模拟用户002也接入websocket:
此刻,我们模拟咱们服务器给客户推送消息,有群发和单独发送,我们一一实践:
单独发送,只需要调用websocket.java里面的 sendMessageTo方法:
那么我们来写个简单的推送信息接口,
@Autowired
WebSocket webSocket;
@ResponseBody
@GetMapping("/sendTo")
public String sendTo(@RequestParam("userId") String userId,@RequestParam("msg") String msg) throws IOException {
webSocket.sendMessageTo(msg,userId);
return "推送成功";
}
我们试着给001这位用户推送个消息:
可以看到001的页面收到了消息,002没有收到(肯定的):
群发(所有接入到websocket的用户都能收到):
@ResponseBody
@GetMapping("/sendAll")
public String sendAll(@@RequestParam("msg") String msg) throws IOException {
String fromUserId="system";//其实没用上
webSocket.sendMessageAll(msg,fromUserId);
return "推送成功";
}
我们试着给所有用户推送个消息:
可以看到大家都收到了这个群发消息:
然后是客户给服务端推送消息,直接操作起来:
其实就是websocket.java里面的onMessage 方法:
细看,其实里面写了点消息逻辑。 这是为了区分这个是一条上线消息还是下线消息等等。
那么发送简单直接给服务器推送消息的话,可以把后边的逻辑先注释掉。 也就是:
然后简单的客户端推送消息给服务器如:
可以看到控制台的打印:
正常收到消息,那么接下来我们把注释的代码打开,
这样只要我们符合逻辑,就能实现001给002 发送消息,或者001给所有人发送消息等等。这些其实都是通过将消息推送到服务器,服务器再判断进行转发而已。
测试一下,001给002发消息:
我们把消息弄成json格式:
{
"message" :" hello,我是001,我想和你做朋友",
"userId":"001",
"to":"002"
}
然后发送:
可以看到控制台有打印:
然后在去看看002用户的页面,成功收到了001的私发消息:
还有其他业务类型 001给所有用户群发等等这些看代码就知道,其实也是根据某个key判断,然后进行发送,就不测试了。
该篇websocket的实践介绍,就到此吧。
该篇文章只是简单地介绍一下大家去使用下websocket,场景是不同客户端能收到服务端推送的消息,服务端也能手动客户端发过来的消息,然后也能互相推送消息。
但是大家热情很高,那么使用websocket实现指定用户推送消息,实现多人聊天 ,其实我很早也是做了demo的,只是没时间去写相关的教学文章
demo的代码下载地址:https://download.csdn.net/download/qq_35387940/11851913
大致的效果,大家都可以通过接口以自己的身份登录,然后可以选择给所有人发送消息,也可以指定给某个人发送消息:
(这两种方式的实现,其实已经可以自己单独拆分出来一些按钮调用方法,去模拟私聊,群聊,讨论组等等这种场景)
那么除了这种方式的实现,我还有去进行整合rabbitmq作为消息代理,实现点对点以及一对多的消息推送,也是有对应的demo:
https://download.csdn.net/download/qq_35387940/12754478
这些demo例子都只做参考,如果合适你的项目场景,你可以使用;如果不合适,你可以自己进行调整魔改。
主要是了解的基本的功能使用方式,很多东西属于扩展的,都是动态的,根据业务需求而变的。