基于 Spring Cloud 使用一致性哈希算法实现分布式 WebSocket. / 基于 RabbitMQ 广播实现分布式 WebSocket.
参考 RFC 标准规范。
核心问题在于具备 WebSocket 服务端的 SpringBoot 或 SpringCloud 等微服务应用是有状态的。服务端的内存中存在维持连接的 Session。客户端连接服务器时,只能和集群中的唯一一个服务实例来连接,而且数据传输过程中也直接和此实例通信。
解决方案有很多。
这个方案非常类似于 HTTP 协议的 Token 实现(非 JWT),Java 中具体的例子就是 Spring Session 以及 Spring Security 的 Token Store,比如 RedisTokenStore 实现了将 OAuth2 Token 存放到 Redis 的逻辑。此方案的核心前提是可序列化。WebSocket 协议和 HTTP 协议并不同,一个 WebSocket Session 对应于一次 TCP Socket 连接而非数据,新的连接将会基于新的四元组,不可能做到序列化。
像微服务注册中心一样,维护一个全局映射关系 —— 当前客户端的 WebSocket 连接是和哪一个实例/节点保持的。每次通信都经过此注册中心或哈希,查询到具体的服务实例,将连接交给它。同样会遇到一些问题,比如:哈希环的实现、如何解决一致性问题(单节点故障的 Session 迁移重连将代价降到最低)、CAP 取舍、实例的区分依据是什么(IP ? 实例 ID?)、可能与注册中心构成强依赖、以及部署环境的变更带来运维上的挑战等难题。这个方案实施有一定的成本和技术限制,可以参考这篇文章的大体思路,理解了核心的哈希环的概念(如下图),再配合注册中心、服务上下线消息,就能完成基本的实现。我利用 Nacos、Redis、RabbitMQ 基于 Spring Cloud 写了一个通过一致性哈希来实现的全栈项目,前端和后端都已开源 —— WebSocket 集群的一致性哈希实践(Spring Cloud 后端) - GitHub , WebSocket 客户端模拟与服务列表(React 前端) - GitHub, 最近刚刚写好基本的功能,参考 @ufiredong 的方式利用 Java 语言的 Docker SDK 实现模拟 WebSocket 实例的上下线。未来还会稍微做细微的优化。
另外,Nacos 作为微服务的注册中心是国内比较流行的选择。但有人早就向 Nacos 社区提出了维护哈希环的议题;姑且不说此议题是否合理,但 Nacos 的维护人员竟然没有能够理解这样的需求,还是有点令人诧异。
最终效果如下, 上线一个新的服务实例后,客户端会自动根据最新哈希环重新路由到最新节点上,收发消息一切正常:
每次连接都需要通知到所有实例,实例各自都判断连接状态在不在自己这里,不在的话直接忽略,在就处理。类似于发布订阅模型,可以通过 MQ(RabbitMQ、Kafka、RocketMQ 等)或 Redis 做到。此方案实现简单,适用于集群规模不大的场景,因为需要所有的节点进行判断或计算。我愿称之为:分布式事件驱动。
以下基于 RabbitMQ 做了简单实现。
声明式 API 的好处就体现在这里:短短几行就直接利用了 RabbitMQ 的 Java SDK 创建好交换机和队列以及绑定关系。
package me.lawrenceli.websocket.server.configuration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MqConfig {
private static final String FANOUT_EXCHANGE = "websocket-exchange";
@Bean
public FanoutExchange fanoutExchange() {
log.info("创建广播交换机 [{}]", FANOUT_EXCHANGE);
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public AnonymousQueue queueForWebSocket() {
log.info("创建用于 WebSocket 的匿名队列");
return new AnonymousQueue();
}
/**
* @param fanoutExchange 交换机
* @param queueForWebSocket 队列
* @return Binding
*/
@Bean
public Binding bindingSingle(FanoutExchange fanoutExchange, AnonymousQueue queueForWebSocket) {
log.info("把匿名队列 [{}] 绑定到广播交换器 [{}]", queueForWebSocket.getName(), fanoutExchange.getName());
return BindingBuilder.bind(queueForWebSocket).to(fanoutExchange);
}
}
然后是生产者和消费者。生产者将消息发送到队列中进行广播,集群的消费者监听队列,判断此 WebSocket Session 是否在当前消费节点再做进一步处理。
消息生产方,一般是外层接收到消息通信请求,然后调用进行集群内广播:
package me.lawrenceli.websocket.server.configuration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class FanoutSender {
@Autowired
private FanoutExchange fanoutExchange;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Object message) {
log.info("开始发送广播: [{}]", message.toString());
rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
}
}
消息消费方,多节点共同消费同一消息,区分是否需要自己来处理:
package me.lawrenceli.websocket.server.configuration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class FanoutReceiver {
@RabbitListener(queues = "#{queueForWebSocket.name}")
public void singleReceiver(Object message) {
log.info("队列接收到了消息: [{}]", message.toString());
// 判断 WebSocket Session 是否在当前节点
// 一般维护在一个 ConcurrentHashMap 静态变量中,这里叫 sessionMap
// message 中有类似 SessionId 的字段
if (sessionMap.contains(sessionId)) {
log.info("WebSocket Session 在当前节点");
// 执行相应的流程
} else {
log.info("当前节点无此 WebSocket Session");
// 什么都不用做,直接忽略
}
}
}
注意,方案 2、方案 3 都没有解决单节点故障问题,因为从本质上讲,没有实现 WebSocket Session 的全局共享。但是方案二能够依赖哈希环的更新,最大程度上地降低迁移连接的代价,客户端会有重试的机制来针对单节点故障问题做容错。如果规模较大,项目复杂,且客户端连接数多,推荐使用一致性哈希的方案二,当然下面也有其他的社区方案。
即刻 App 后端基于 Node.js 使用了 socket.io 实现多端实时通信,Joway 的这篇文章交代的一些细节也涉及了分布式部署的问题。
socket.io 官网也提供了多节点的使用方法,不外乎 IP Hash 之类的方案。
如果部署环境基于 Kubernetes 内的 WebSocket
集群,也可以参考阳明的这篇文章来实践 (利用 Service
的 sessionAffinity
会话亲和确保每次将来自特定客户端的连接传递到同一 Pod:使用 ClientIP
声明)。