达观数据是为企业提供大数据处理、个性化推荐系统服务的知名公司,在应对海量数据处理时,积累了大量实战经验。其中达观数据在面对大量的数据交互和消息处理时,使用了称为DPIO的设计思路进行快速、稳定、可靠的消息数据传递机制,本文分享了达观数据在应对大规模消息数据处理时所开发的通讯中间件DPIO的设计思路和处理经验(达观数据架构师 桂洪冠)
一、数据通讯进程模型
在设计达观数据的消息数据处理机制时,首先充分借鉴了ZeroMQ和ProxyIO的设计思想。ZeroMQ提供了一种底层的网络通讯框架,提供了基本的RoundRobin负载均衡算法,性能优越,而ProxyIO是雅虎的网络通讯中间件,承载了雅虎内部大量计算节点间的实时消息处理。但是ZeroMQ没有实现基于节点健康状态的最快响应算法,并且ZeroMQ和ProxyIO对节点的状态管理,连接管理,负载均衡调度等也需要各应用自己来实现。
达观科技在借鉴两种设计思路的基础上,从进程模型、服务架构、线程模型、通讯协议、负载均衡、雪崩处理、连接管理、消息流程、状态监控等各方面进行了开拓,开发了DPIO(达观ProxyIO的简写,下文统称DPIO),确保系统高性能处理相关数据。
在DPIO的整个通讯框架体系中,采用集中管理、统一监控策略管理节点提供服务,节点间直接进行交互,并不依赖统一的管理节点(桂洪冠)。几种节点间通过http或者tcp协议进行消息传递、配置更新、状态跟踪等通讯行为。集群将不同应用的服务抽象成组的概念,相同应用的服务启动时加入的相同的组。每个通讯组有两种端点client和server。应用启动时通过配置决定自己是client端点还是server端点,在一个组内,每个应用只能有一个身份;不同组没要求。
二、DPIO消息传递逻辑架构
DPIO服务节点内/间的通讯及消息传递模型见下图:
三、线程模型
DPIO的线程模型:
App epoll thread检测从api来的请求信息,并将请求信息转发到待处理队列中。从已处理队列中获取应答包,并将处理结果转发给api
Io epoll thread检测从远端的proxy来的可写事件,并将请求包转发到远端的proxy。检测从远端的proxy的可读事件,并将应答包放在已处理队列中
Monitor thread检测DPIO的工作状态请求,将DPIO的工作状态返回。并将决定Io epoll thread和app epoll thread的负载均衡(桂洪冠)。
四、通信协议
字段 | 含义 | 长度 |
protocol len | 协议包的总长度 | 4bytes |
protocol head len | 协议头的长度 | 1byte |
Version_protocol_id | 协议的版本号和协议号 | 1byte |
Flag | 消息标志,标志路由模式,是否记录来源地址,有二级路由,所以这个字段一定要Eg,末位表示要记录src,倒数第二位表示按roundrobin路由,倒数第3位表示按消息头路由,xxx | 1byte |
Proxy | 来源/目的 proxy | 2bytes |
Api | 来源/目的 api | 2bytes |
ApiTtl | 协议包的发送时间 | 2Bytes |
ClientTtl | 消息存活的时间,后面添加,增加路由策略,选择app_server | 2Bytes |
ClientProcessTime | 客户端处理所用时间 | 2Bytes |
ServerTtl | 消息存活的时间,后面添加,增加路由策略,选择app_client | 2Bytes |
timeout | 协议包的超时时间 | 2 byte |
Sid | 消息序列号 | 4bytes |
protocol body len | Body长度 | 4bytes |
protocol body | 消息体 | Size |
字段 | 含义 | 长度 |
protocol head len | 协议头的长度 | 1byte |
Version_protocol_id | 协议的版本号和协议号 | 1byte |
Flag | 消息标志,标志路由模式,是否记录来源地址,有二级路由,所以这个字段一定要Eg,末位表示要记录src,倒数第二位表示按roundrobin路由,倒数第3位表示按消息头路由,xxx | 1byte |
ApiTtl | 协议包的发送时间 | 2bytes |
Timeout | 协议包的超时时间 | 2bytes |
Api | 来源/目的 api | 2bytes |
Sid | 消息序列号 | 4byte |
Begin_offset | 协议包的起始偏移 | 4bytes |
len | 协议包长度 | 4bytes |
字段 | 含义 | 长度 |
protocol head len | 协议头的长度 | 1byte |
Version_protocol_id | 协议的版本号和协议号 | 1byte |
Flag | 消息标志,标志路由模式,是否记录来源地址,有二级路由,所以这个字段一定要Eg,末位表示要记录src,倒数第二位表示按roundrobin路由,倒数第3位表示按消息头路由,xxx | 1byte |
Result | 处理结果 | 1byte |
sid | 消息序列号 | 4bytes |
begin_offset | 协议包的起始偏移 | 4bytes |
len | 协议包长度 | 4bytes |
字段 | 含义 | 长度 |
protocol len | 协议包的总长度 | 4bytes |
protocol head len | 协议头的长度 | 4bytes |
Version | 协议的版本号 | 4bytes |
protocol id | 协议的协议号 | 4bytess |
status_version | 当前状态版本 | 4bytes |
Proxy_identify_len | 该proxy标识长度 | 4bytess |
Proxy_identify | 该proxy 标识 | 4bytes |
protocol body | 消息体 | Size |
字段 | 含义 | 长度 |
protocol len | 协议包的总长度 | 4bytes |
protocol head len | 协议头的长度 | 4bytes |
Version | 协议的版本号 | 4bytes |
protocol id | 协议的协议号 | 4bytess |
protocol body len | Body长度 | 4bytes |
protocol body | 消息体 | Size |
五、负载均衡
DPIO的负载均衡基于最快响应法
DPIO将所有的统计信息更新到监控中心,监控中心通过处理所有的节点的状态信息,统一负责负载均衡。
DPIO从监控中心获取所有连接的负载均衡策略。每个连接知道只需知道自己的处理能力。
以上图为例,有三个proxy server处理程序。处理能力分别为50、30、20,一次epoll过程能够同时探测多个连接的可写事件。
假设:三个proxy server的属于同一epoll thread,且三个proxy server假设都处理能力无限大。
限制:如果刚开始时待处理队列的数据包个数为100个,多次发送轮回后proxy server A≥proxy server B≥proxy server C, 每个发送的最多发送协议包数为待处理队列协议包个数 * 该连接所占权重
六、雪崩处理
大型在线服务,特别是对于时延敏感的服务,当系统外部请求超过系统服务能力,而没有适当的过载保护措施时,当系统累计的超时请求达到一定规模,将可能导致系统缓冲区队列溢出,后端服务资源耗尽,最终像雪崩一样形成恶性循环。这时系统处理的每个请求都因为超时而无效,系统对外呈现的服务能力为0,且这种情况下不能自动恢复。
我们的解决策略是对协议包进行生命周期管理,现在协议包进出待处理队列和已处理队列时进行超时检测和超时处理(超时则丢弃)。
proxy client:
当app epoll thread将协议包放入待处理队列时,会将该协议包的发送时间、该协议包的超时时间,当前时间戳来判断该协议包是否已经超时。
当app epoll thread将协议包从已处理队列中移除时,会将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
当Io epoll thread将协议包从待处理队列中移除时,会将该协议包的发送时间、该协议包的超时时间,当前时间戳,该连接的协议包的平均处理时间移除。
当io epoll thread将协议包放入已处理队列时,会将将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
proxy server:
当App epoll thread将协议包从待处理队列中移除时,会将该协议包在客户端的处理时间、该协议包的超时时间、该协议包的proxy server接收时间戳、当前时间戳来判断该协议包是否已超时。
当app epoll thread将协议包放入已处理队列时,会将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
当io epoll thread将协议包从已处理队列中移除时,会将该协议包的发送时间、该协议包的超时时间,已经当前时间戳来判断该协议包是否已经超时。
当io epoll thread将协议包放入待处理队列时,会将该协议包的发送时间、该协议包的超时时间来判断该协议包是否已超时。
七、连接管理
红黑树:
红黑树:保存所有连接的最近的读/写时间戳。
当epoll_wait时,首先从红黑树中获取oldest的时间戳,并将当前时间戳与oldest时间戳的时间差作为epoll_wait的超时时间,当连接中有可读/写事件发送时,首先从红黑树中删除该节点,当可读/写事件处理完毕后,再将节点插入到红黑树中,当处理完所有连接的可读/写事件时,再从红黑树中依次从移除时间戳小于当前时间戳的连接,并触发该连接的timeout事件。
八、消息处理流程
九、状态监控
连接池中存在:当前可用连接个数
连接池中再分别获取每个连接的状态
每个可用连接分别维护以下信息:
连接处理的数据包个数、连接send失败次数、连接协议包的平均处理时间。
连接的连接状态(当重连失败达到一定次数时,定义为连接失败)。
连接的重连次数、连接的超时次数。
当监控线程accept到client的连接时,解析请求内容,然后调用连接池对象的statistics方法,连接池对象首先写入自己的统计信息,然后分别调用每个连接的statistics方法,每个连接分别填写自己的统计信息
本文小结
大规模消息传递会遇到很多可靠性、稳定性的问题,DPIO是达观在处理大数据通讯时的一些经验,和感兴趣的朋友们分享,期待与大家不断交流与合作