netty无缝切换rabbitmq、activemq、roc
admin
2023-01-20 04:20:04
0

netty无缝切换rabbitmq、activemq、roc

netty无缝切换rabbitmq、activemq、roc

netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否可以,以及处理登录认证的UserAuthHandler和消息处理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(defLoopGroup,
        //编码解码器
        new HttpServerCodec(),
        //将多个消息转换成单一的消息对象
        new HttpObjectAggregator(65536),
        //支持异步发送大的码流,一般用于发送文件流
        new ChunkedWriteHandler(),
        //检测链路是否读空闲,配合心跳handler检测channel是否正常
        new IdleStateHandler(60, 0, 0),
        //处理握手和认证
        new UserAuthHandler(),
        //处理消息的发送
        new MessageHandler()
    );
}

对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel

public static void addChannel(Channel channel) {
        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
        System.out.println("addChannel:" + remoteAddr);
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}", remoteAddr);
        }
        UserInfo userInfo = new UserInfo();
        userInfo.setAddr(remoteAddr);
        userInfo.setChannel(channel);
        userInfo.setTime(System.currentTimeMillis());
        userInfos.put(channel, userInfo);
    }

登录后,channel就变成有效的channel,无效的channel之后将会丢弃

public static boolean saveUser(Channel channel, String nick, String password) {
        UserInfo userInfo = userInfos.get(channel);
        if (userInfo == null) {
            return false;
        }
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
            return false;
        }

        if (nick == null || password == null) {
            return false;
        }
        LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
        if (account == null) {
            return false;
        }
        // 增加一个认证用户
        userCount.incrementAndGet();
        userInfo.setNick(nick);
        userInfo.setAuth(true);
        userInfo.setId(account.getId());
        userInfo.setUsername(account.getUsername());
        userInfo.setGroupNumber(account.getGroupNumber());
        userInfo.setTime(System.currentTimeMillis());

        // 注册该用户推送消息的通道
        offlineInfoTransmitStatic.registerPull(channel);
        return true;
    }

当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。

public static void removeChannel(Channel channel) {
        try {
            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
            //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误
            rwLock.writeLock().lock();
            channel.close();
            UserInfo userInfo = userInfos.get(channel);
            if (userInfo != null) {
                if (userInfo.isAuth()) {
                    offlineInfoTransmitStatic.unregisterPull(channel);
                    // 减去一个认证用户
                    userCount.decrementAndGet();
                }
                userInfos.remove(channel);
            }
        } finally {
            rwLock.writeLock().unlock();
        }

    }

为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。

public interface OfflineInfoTransmit {
    void pushP2P(Integer userId, String message);

    void pushGroup(String groupNumber, String message);

    void registerPull(Channel channel);

    void unregisterPull(Channel channel);
}

其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:

  1. 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能
  2. 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线
  3. 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。
  4. 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。

netty无缝切换rabbitmq、activemq、roc

代码地址:
https://github.com/shuangyueliao/netty-chat

相关内容

热门资讯

欧盟想对付中国汽车,英国“躺枪... 【文/观察者网 潘昱辰 编辑/高莘】据英国《金融时报》报道,3月4日,欧盟委员会正式公布《工业加速器...
最便宜的苹果笔记本!MacBo... 快科技3月7日消息,苹果本周正式推出了全新的入门级笔记本电脑MacBook Neo,官方起售价定为4...
干将新材料取得风味保持剂混料处... 国家知识产权局信息显示,干将新材料有限公司取得一项名为“一种风味保持剂混料处理机构”的专利,授权公告...
刚刚,Gemini攻克「宇宙弦... 新智元报道 编辑:定慧 【新智元导读】就在刚刚,Google Research团队用Gemini ...
华为无线专家:打造一张面向智能... 文/观察者网 吕栋 移动AI的发展异常迅猛,人类社会正快速迈入智能体互联网时代。 如今在中国,超...
算电协同首次被写入政府工作报告... 来源:证券日报网 “算电协同”被首次写入政府工作报告。3月5日,政府工作报告提出,“实施超大规模智算...
原创 天... 黑洞奇点奠定物理基础 罗杰·彭罗斯1931年出生在英国埃塞克斯郡科尔切斯特,从小家里就满是科学氛...
美军打伊朗是为了“世界末日,耶... 【文/观察者网 阮佳琪】美以伊战事紧张之际,本以为特朗普故技重施,找来一堆牧师围着自己“发功祈祷”,...
伊朗:最高领袖选举会议将在未来... 新华社德黑兰3月7日电 据伊朗伊斯兰革命卫队7日发布的消息,一名伊朗专家会议成员称,选举伊朗最高领袖...
腾讯QQ正式接入OpenCla... 3月7日,腾讯宣布为QQ新增AI生态能力,用户现可通过官方渠道将OpenClaw智能体接入QQ机器人...