SpringBoot2.0集成WebSocket,多客户端

news/2024/7/24 5:22:07 标签: 服务器, 运维, java, spring, spring boot, websocket

适用于单客户端,一个账号登陆一个客户端,登陆多个客户端会报错

The remote endpoint was in state [TEXT_FULL_WRITING] 

这是因为此时的session是不同的,只能锁住一个session,解决此问题的方法把全局静态对象锁住,因为账号是唯一的

java">/**
 * @Description 开启springboot对websocket的支持
 * @Author WangKun
 * @Date 2023/8/14 17:21
 * @Version
 */
@ConditionalOnProperty(name = "spring.profiles.active", havingValue = "dev")
@Configuration
public class WebSocketConfig{

    /**
     * @Description 注入一个ServerEndpointExporter, 会自动注册使用@ServerEndpoint注解
      * @param
     * @Throws
     * @Return org.springframework.web.socket.server.standard.ServerEndpointExporter
     * @Date 2023-08-14 17:26:31
     * @Author WangKun
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
java">/**
 * @Description websocket服务,不考虑分组
 * @Author WangKun
 * @Date 2023/8/14 17:29
 * @Version
 */
@ConditionalOnClass(value = WebSocketConfig.class)
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocket {

    private static final long SESSION_TIMEOUT = 60000;

    //存放每个客户端对应的WebSocket对象。
    private static final ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocket>> WEB_SOCKET_MAP = new ConcurrentHashMap<>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String userId;

    /**
     * @Description 重写防止session重复
      * @param o
     * @Throws
     * @Return boolean
     * @Date 2023-09-01 10:02:51
     * @Author WangKun
     */
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WebSocket that = (WebSocket) o;
        return Objects.equals(session, that.session);
    }

    @Override
    public int hashCode() {
        return Objects.hash(session);
    }

    /**
     * @param session
     * @param userId
     * @Description 建立连接
     * @Throws
     * @Return void
     * @Date 2023-08-14 17:52:08
     * @Author WangKun
     */
    @SneakyThrows
    @OnOpen
    public void onOpen(final Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        session.setMaxIdleTimeout(SESSION_TIMEOUT);
        //先查找是否有uniCode
        CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
        if (users == null) {
            //处理多个同时连接并发
            synchronized (WEB_SOCKET_MAP) {
                if (!WEB_SOCKET_MAP.contains(userId)) {
                    users = new CopyOnWriteArraySet<>();
                    WEB_SOCKET_MAP.put(userId, users);
                }
            }
        }
        users.add(this);
        sendMessage(String.valueOf(ResponseCode.CONNECT_SUCCESS.getCode()));
        log.info("用户--->{} 连接成功,当前在线人数为--->{}", userId, WEB_SOCKET_MAP.size());
    }

    /**
     * @param message
     * @Description 向客户端发送消息 session.getBasicRemote()与session.getAsyncRemote()的区别
     * @Throws
     * @Return void
     * @Date 2023-08-14 17:51:07
     * @Author WangKun
     */
    @SneakyThrows
    public void sendMessage(String message) {
        // 加锁避免阻塞
        // 如果有多个客户端的话,亦或者同一个用户,或者打开了多个浏览器(同一个用户打开多个客户端或者多个界面),开了多个页面,此时Session是不同的,只能锁住一个session,所以锁住全局静态对象
//        synchronized(session) {
//            this.session.getBasicRemote().sendText(message);
//        }
        synchronized (WEB_SOCKET_MAP) {
            CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
            if (users != null) {
                for (WebSocket user : users) {
                    user.session.getBasicRemote().sendText(message);
                    log.info("向客户端发送数据--->{} 数据为--->{}", userId, message);
                }
            }
        }
    }

    /**
     * @param
     * @Description 关闭连接
     * @Throws
     * @Return void
     * @Date 2023-08-14 17:52:30
     * @Author WangKun
     */
    @OnClose
    public void onClose(Session session) {
        // 避免多人同时在线直接关闭通道。
        CopyOnWriteArraySet<WebSocket> copyOnWriteArraySet = WEB_SOCKET_MAP.get(this.userId);
        if (!copyOnWriteArraySet.isEmpty()) {
            Object[] objects = copyOnWriteArraySet.toArray();
            for (Object object : objects) {
                if (((WebSocket) object).session.equals(session)) {
                    //删除当前用户
                    WEB_SOCKET_MAP.get(this.userId).remove((WebSocket) object);
                }
            }
            log.info("用户--->{} 关闭连接!", userId);
        }
    }

    /**
     * @param message
     * @param session
     * @Description 收到客户端消息
     * @Throws
     * @Return void
     * @Date 2023-08-15 10:54:55
     * @Author WangKun
     */
    @SneakyThrows
    @OnMessage
    public void onMessage(String message, Session session) {
        //枷锁避免多个资源互抢
        //这一块可以操作数据,比如存到数据

        // 同一个用户,多个地方登录(多个session),循环发送消息,
        // 如果有多个客户端的话,亦或者同一个用户,或者打开了多个浏览器,开了多个页面,此时Session是不同的,只能锁住一个session,所以锁住全局静态对象
        synchronized (WEB_SOCKET_MAP) {
            CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
            if (users != null) {
                for (WebSocket user : users) {
                    user.session.getBasicRemote().sendText("pong");
                    log.info("收到客户端发送的心跳数据--->{} 数据为--->{}", userId, message);
                }
            }
        }
    }

    /**
     * @param session
     * @param error
     * @Description 发生错误时
     * @Throws
     * @Return void
     * @Date 2023-08-15 10:55:27
     * @Author WangKun
     */
    @OnError
    public void onError(Session session, Throwable error) {
        CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
        if (users != null) {
            WEB_SOCKET_MAP.remove(userId);
            log.error("用户--->{} 错误!" + userId, "原因--->{}" + error.getMessage(), error);
        }
    }

    /**
     * @param userId
     * @param message
     * @Description 通过userId向客户端发送消息(指定用户发送)
     * @Throws
     * @Return void
     * @Date 2023-08-14 18:01:35
     * @Author WangKun
     */
    public static void sendTextMessageByUserId(String userId, String message) {
        CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
        if (users != null) {
            for (WebSocket user : users) {
                user.sendMessage(message);
                log.info("服务端发送消息到用户{},消息:{}", userId, message);
            }
        }
    }

    /**
     * @param message
     * @Description 群发自定义消息
     * @Throws
     * @Return void
     * @Date 2023-08-14 18:03:38
     * @Author WangKun
     */
    public static void sendTextMessage(String message) {
        // 如果在线一个就广播
        if (!WEB_SOCKET_MAP.isEmpty()) {
            for (String item : WEB_SOCKET_MAP.keySet()) {
                CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(item);
                if (users != null) {
                    for (WebSocket user : users) {
                        user.sendMessage(message);
                        log.info("服务端发送消息到用户{},消息:{}", item, message);
                    }
                }
            }
        }
    }
}


http://www.niftyadmin.cn/n/4993068.html

相关文章

【Datawhale】AI夏令营第三期——基于论文摘要的文本分类笔记(下)

笔记上部分请看【Datawhale】AI夏令营第三期——基于论文摘要的文本分类笔记(上) 文章目录 一、深度学习Topline1.1 数据预处理1.2 模型训练1.3 评估模型1.4 测试集推理1.5 后续改进 二、大模型Topline2.1 大模型介绍2.2 大模型是什么&#xff1f;2.3 大模型的原理2.4 大模型可…

【Cadence】Calculator计算sp的3dB带宽

【Cadence】Calculator计算sp的3dB带宽 1.计算最大增益2.cross函数3. 3dB带宽 下面演示如何在Cadence计算s参数&#xff08;如增益&#xff09;的3dB带宽 1.计算最大增益 ymax函数 2.cross函数 cross函数可以计算经过y轴给定值对应的x坐标 edge number选择1是经过的第一个点…

9.嵌套路由

1.添加主路由main <template><div><!-- 页面布局 --><el-container><!-- 侧边栏 --><el-aside width"200px">Aside</el-aside><!-- 页面布局 右边 包括header 和main --><el-container><el-header>Head…

git 忽略已经提交的文件或文件夹 (修改.gitignore文件无效)

场景描述&#xff1a;项目开发到一半&#xff0c;追加了模块&#xff0c;提交的时候未注意将不需要提交的文件或者目录提交到.gitignore&#xff0c;然后提交后发现再修改git配置文件已无法阻拦更新&#xff0c;查阅官方资料&#xff1a; 核心点&#xff1a;.gitignore 之前&a…

结合OB Cloud区别于MySQL的4大特性,规划降本方案

任何一家企业想要获得持续性的发展与盈利&#xff0c;“降本增效”都是难以绕开的命题。但是“一刀切”的降本影响往往不太可控&#xff0c;成本的快速收缩往往会给业务带来低效运营和增长缓慢的风险。所以我们所说的降本&#xff0c;是指在成本降低的同时&#xff0c;效率不降…

IO进程线程、互斥锁、进程间通信:1、无名管道,2、有名管道

一、线程互斥 引入互斥(mutual exclusion)锁的目的是用来保证共享数据操作的完整性。 互斥锁主要用来保护临界资源 每个临界资源都由一个互斥锁来保护&#xff0c;任何时刻最多只能有一个线程能访问该资源 线程必须先获得互斥锁才能访问临界资源&#xff0c;访问完资源后释…

部署java程序的服务器cpu过高如何排查和解决

1.top命令找到占用CPU高的Java进程PID 2.根据进程ID找到占用CPU高的线程 ps -mp pid -o THREAD,tid | sort -r ps -mp 124682 -o THREAD,tid | sort -r 3.将指定的线程ID输出为16进制格式 printf “%x\n” tid printf "%x\n" 6384 18f0 4.jstack pid |…

独立站怎么做活动策划,独立站推广方式有哪些

独立站可以获得更多的用户关注和认可&#xff0c;进而实现业务增长和发展&#xff0c;因此活动策划至关重要&#xff0c;那么独立站怎么做活动策划&#xff0c;独立站推广方式有哪些&#xff1f; 独立站怎么做活动策划&#xff1f; 1、明确目标&#xff1a;在开始策划之前&am…