项目背景

和各位读者大致介绍下具体场景,线上的小程序中开放一些语音麦克风的房间,让用户进入房间之后可以互相通过语音聊天的方式进行互动。

java即时通讯基础教程(教你用纯Java实现一个即时通讯系统)(1)

这里分享一下相关的技术设计方案。这款系统的核心点设计在于如何能让一个用户发出的语音通知到其他用户上边。语音数据在客户端同事的处理下最终变成了io数据流请求到了后端,后端只需要将这些数据流传达给各个不同的终端即可达到广播通知的效果。

单机版架构

最初期上线的时候,为了赶速度,快速试错,所以简单地采用了单机版架构去设计。结合技术栈为 SpringBoot,WebSocket,MySQL技术。

线上一间语音房间的同时在线人数并不会特别多,大概在15-50人的区间段内,系统核心代码是通过SpringBoot内部的WebSocket技术去进行数据的主动推送。

设计思路

整体的设计图比较简单,基本就是一台服务器存储WebSocket连接,如下图所示:

java即时通讯基础教程(教你用纯Java实现一个即时通讯系统)(2)

用户进行WebSocket初始化连接的时候需要一个连接分配和存储的过程:

java即时通讯基础教程(教你用纯Java实现一个即时通讯系统)(3)

早期的存储是存放在了服务器本地的一个Map集合中。

java即时通讯基础教程(教你用纯Java实现一个即时通讯系统)(4)

当WebSocket进行连接的时候就会往内存中写入一条数据信息,当链接断开的时候,就将内存中的数据移除。然后进行语音广播的时候需要结合WebSocket内部的广播发送功能进行通知

java即时通讯基础教程(教你用纯Java实现一个即时通讯系统)(5)

看似设计比较简单,但是在后期业务变得庞大的时候出现了瓶颈。因为随着参加语音活动用户的增加,越来越多的WebSocketSession对象需要被存储到内存当中,这种有状态性的存储对于单机扩容不灵活。

设计缺陷

1.假设原先的服务器扩容到了A,B两台机器,A用户在A机器上边建立了WebSocketSession,B用户在B机器上边建立的WebSocketSession连接。此时如果A想要和B进行对话发送,需要先查找到具体WebSocketSession存放在哪台机器上边。

2.当用户出现了网络异常,临时断开连接进行重连的时候,也可能会出现1所说的问题。

集群架构

设计思路

一旦出现需要发送语音通知的时候,发送一条广播的mq消息,每个机器都接收到消息之后,触发自己的广播操作即可。

RocketMq的接入系统设计里面mq采用的是广播模式,这和我们通常使用的集群模式有一定的区别。

消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:

集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。

集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。

广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

集群消费模式适用场景 适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。

注意事项

集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。

集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

广播消费模式适用场景 适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。

注意事项

广播消费模式下不支持顺序消息。

广播消费模式下不支持重置消费位点。

每条消息都需要被相同订阅逻辑的多台机器处理。

消费进度在客户端维护,出现重复消费的概率稍大于集群模式。

广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

这里面的应用场景需要对集群内部对每个消费者都对服务器内存中的socket连接进行session是否存在对判断,因此需要采用mq的广播模式。

关于mq部分的接入代码

Consumer模块的配置:

package org.idea.web.socket.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**

* @Author linhao

* @Date created in 10:30 上午 2021/5/10

*/

@ConfigurationProperties(prefix = "rocketmq.consumer")

public class MqConsumerConfig {

private boolean isOn;

private String groupName;

private String nameSrvAddr;

private String topics;

private Integer consumeThreadMin;

private Integer consumeThreadMax;

private Integer consumeMessageBatchMaxSize;

/**

getter 和 setter部分省略

**/

}

Producer模块的配置展示:

package org.idea.web.socket.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**

* @Author linhao

* @Date created in 10:26 上午 2021/5/10

*/

@ConfigurationProperties(prefix = "rocketmq.producer")

public class MqProducerConfig {

private boolean isOn;

private String groupName;

private String nameSrvAddr;

private Integer maxMessageSize;

private Integer sendMsgTimeout;

private Integer retryTimesWhenSendFailed;

/**

getter 和 setter部分省略

**/

}

RocketMq内部的消费端Bean配置

package org.idea.web.socket.mq;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import org.idea.web.socket.config.MqConsumerConfig;

import org.idea.web.socket.config.MqProducerConfig;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;

import org.springframework.boot.autoconfigure.AutoConfigureBefore;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**

* @Author linhao

* @Date created in 10:34 上午 2021/5/10

*/

@Configuration

@Slf4j

@EnableConfigurationProperties({MqConsumerConfig.class})

public class MqConsumerAutoConfig {

@Resource

private MqConsumerConfig mqConsumerConfig;

@Resource

//这个接口需要手动实现顺序消费的逻辑 每次获取到消息队列的第一条数据

private MessageListenerHandler messageListenerConcurrently;

@Bean

@ConditionalOnMissingBean

public DefaultMQPushConsumer defaultMQPushConsumer() {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());

consumer.setConsumerGroup(mqConsumerConfig.getGroupName());

consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());

consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());

consumer.registerMessageListener(messageListenerConcurrently);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//消费模型是什么?

consumer.setMessageModel(MessageModel.BROADCASTING);

//默认一次拉取一条消费

consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());

//*表示订阅所有的tag

try {

consumer.subscribe(mqConsumerConfig.getTopics(), "*");

consumer.start();

log.info("【 MqConsumerAutoConfig 】mq consumer is started!");

} catch (Exception e) {

log.error("mq start fail,e is ", e);

}

return consumer;

}

}

RocketMq的服务生产者Bean配置

package org.idea.web.socket.mq;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.idea.web.socket.config.MqProducerConfig;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;

import org.springframework.boot.autoconfigure.AutoConfigureBefore;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**

* @Author linhao

* @Date created in 11:05 上午 2021/5/10

*/

@Configuration

@Slf4j

@EnableConfigurationProperties({MqProducerConfig.class})

public class MqProducerAutoConfig {

@Resource

private MqProducerConfig mqProducerConfig;

@Bean

@ConditionalOnMissingBean

//意味着DefaultMQProducer的配置可以被覆盖

public DefaultMQProducer defaultMQProducer() {

DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());

producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());

//没有则自动创建topic的key

// producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());

producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());

producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());

try {

producer.start();

log.info("【 MqProducerAutoConfig 】mq producer is started!");

} catch (Exception e) {

log.error("[MqProducerAutoConfig] start fail, e is ", e);

}

return producer;

}

}

然后是对RocketMq内部发送消息事件的一层函数封装

package org.idea.web.socket.mq;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import org.idea.web.socket.config.MqProducerConfig;

import org.idea.web.socket.dto.BroadcastMqDTO;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.io.UnsupportedEncodingException;

/**

* 消息广播发送端

*

* @Author linhao

* @Date created in 10:43 下午 2021/5/9

*/

@Component

@Slf4j

public class BroadcastMqProducer {

@Resource

private DefaultMQProducer defaultMQProducer;

@Resource

private MqProducerConfig mqProducerConfig;

private static String TOPIC = "ws-topic";

private static String TAGS = "ws-tag";

public static Integer ALL_USER_RECEIVE_TYPE = 1;

public static Integer ONE_USER_RECEIVE_TYPE = 2;

/**

* 点对点之间的消息发送

*

* @param destSessionKey

* @param msg

* @return

*/

public SendResult sendWebSocketToUser(String destSessionKey,String msg) {

if (StringUtils.isEmpty(msg)) {

log.error("[sendWebSocketToUser] msg can not be null!");

return null;

}

Message message = null;

SendResult sendResult = null;

try {

BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();

broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);

broadcastMqDTO.setMessage(msg);

broadcastMqDTO.setSessionKey(destSessionKey);

message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));

sendResult = defaultMQProducer.send(message);

} catch (Exception e) {

log.error("[sendWebSocketBroadcastMsg] e is ", e);

}

return sendResult;

}

/**

* 广播消息发送

*

* @param msg

* @return

*/

public SendResult sendWebSocketBroadcastMsg(String msg) {

if (StringUtils.isEmpty(msg)) {

log.error("[sendWebSocketBroadcastMsg] msg can not be null!");

return null;

}

Message message = null;

SendResult sendResult = null;

try {

BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();

broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);

broadcastMqDTO.setMessage(msg);

message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));

sendResult = defaultMQProducer.send(message);

} catch (Exception e) {

log.error("[sendWebSocketBroadcastMsg] e is ", e);

}

return sendResult;

}

}

对消息的订阅模块实现代码如下:

package org.idea.web.socket.mq;

import com.alibaba.fastjson.JSON;

import com.oracle.tools.packager.Log;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import org.idea.web.socket.dto.BroadcastMqDTO;

import org.idea.web.socket.manager.SocketManager;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.stereotype.Component;

import org.springframework.util.CollectionUtils;

import org.springframework.web.socket.WebSocketSession;

import javax.annotation.Resource;

import java.util.List;

import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;

import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;

/**

* @Author linhao

* @Date created in 10:59 上午 2021/5/10

*/

@Component

@Slf4j

public class MessageListenerHandler implements MessageListenerConcurrently {

@Resource

private SocketManager socketManager;

@Resource

private SimpMessagingTemplate template;

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

if (CollectionUtils.isEmpty(list)) {

Log.info("receive empty msg");

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

MessageExt messageExt = list.get(0);

byte[] bytes = messageExt.getBody();

String json = new String(bytes);

BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);

log.info("[MessageListenerHandler] broadcastMqDTO is " broadcastMqDTO);

if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {

log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" broadcastMqDTO);

template.convertAndSend("/topic/sendTopic", broadcastMqDTO);

} else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {

String sessionKey = broadcastMqDTO.getSessionKey();

WebSocketSession webSocketSession = socketManager.get(sessionKey);

if (webSocketSession != null) {

template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());

log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" broadcastMqDTO);

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

}

整体设计结构如下图:

于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。

业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。

设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。

,