Web
服务端推送技术经过了长轮询、短轮询的发展,最终到 HTML5
标准带来的 WebSocket
规范逐步成为了目前业内主流技术方案。它使得消息推送、消息通知等功能的实现变得异常简单,那么百万级别连接下的 Websocket
网关该如何实践呢?本文整理自石墨文档资深工程师杜旻翔根据石墨 Websocket
重构的实践经验。
在石墨文档的业务中,如文档分享、评论、幻灯片演示和文档表格跟随等场景,涉及多客户端数据同步和服务端批量数据推送的需求,采用短轮询或长轮询的方式无法很好的满足服务端消息推送、消息通知的业务场景,因此选择业内的主流方案:基于 HTML5
标准定义的 WebSocket
规范。
随着石墨文档的发展,现在日连接峰值达到了百万量级,日益增长的用户连接数和停止更新的架构设计导致了内存和 CPU 使用量急剧增长,因此我们考虑对网关进行重构,以适应发展需求。
网关 1.0 是使用 Node.js
基于 Socket.IO
进行修改开发的版本,很好的满足了当时用户量级下的业务场景需求。
网关 1.0 版本架构设计图:
网关 1.0 客户端连接流程:
虽然 1.0 版本的网关在线上运行良好,但是不能很好的支持后续业务的扩展,并且有以下几个问题需要解决:
网关 2.0 需要解决很多问题:石墨文档内部有很多组件:文档、表格、幻灯片和表单等等。在 1.0 版本中组件对网关的业务调用可以通过:Redis、Kafka 和 HTTP 接口,来源不可查,管控困难。此外,从性能优化的角度考虑也需要对原有服务进行解耦合,将 1.0 版本网关拆分为网关功能部分和业务处理部分,网关功能部分为 WS-Gateway:集成用户鉴权、TLS 证书验证和 WebSocket 连接管理等;业务处理部分为 WS-API:组件服务直接与该服务进行 gRPC 通信。可针对具体的模块进行针对性扩容;服务重构加上 Nginx 移除,整体硬件消耗显著降低;服务整合到石墨监控体系。
网关 2.0 版本架构设计图:
网关 2.0 客户端连接流程:
网络状态良好的情况下,完成如下图所示步骤 1 到步骤 6 之后,直接进入 WebSocket 流程;网络环境较差的情况下,WebSocket 的通信模式会退化成 HTTP 方式,客户端通过 POST 方式推送消息到服务端,再通过 GET 长轮询的方式从读取服务端返回数据。客户端初次请求服务端连接建立的握手流程:
{"sid":"xxx","upgrades":["websocket"],"pingInterval":xxx,"pingTimeout":xxx}
客户端与服务端连接建立采用的 wss 协议,在 1.0 版本中 TLS 证书挂载在 Nginx 上,HTTPS 握手过程由 Nginx 完成,为了降低 Nginx 的机器成本,在 2.0 版本中我们将证书挂载到服务上,通过分析服务内存,如下图所示,TLS 握手过程中消耗的内存占了总内存消耗的大概 30% 左右。
这个部分的内存消耗无法避免,我们有两个选择:
对每次连接必须产生一个唯一码,如果出现重复会导致串号,消息混乱推送的问题。选择 SnowFlake 算法作为唯一码生成算法。
物理机场景中,对副本所在物理机进行固定编号,即可保证每个副本上的服务产生的 Socket ID 是唯一值。
K8S 场景中,这种方案不可行,于是采用注册下发的方式返回编号,WS-Gateway 所有副本启动后向数据库写入服务的启动信息,获取副本编号,以此作为参数作为 SnowFlake 算法的副本编号进行 Socket ID 生产,服务重启会继承之前已有的副本编号,有新版本下发时会根据自增 ID 下发新的副本编号。于此同时,Ws-Gateway 副本会向数据库写入心跳信息,以此作为网关服务本身的健康检查依据。
客户端完成握手流程后,会话数据在当前网关节点内存存储,部分可序列化数据存储到 Redis,存储结构说明如下:
键 | 说明 |
---|---|
ws:user:clients:${uid} | 存储用户和 WebSocket 连接的关系,采用有序集合方式存储 |
ws:guid:clients:${guid} | 存储文件和 WebSocket 连接的关系,采用有序结合方式存储 |
ws:client:${socket.id} | 存储当前 WebSocket 连接下的全部用户和文件关系数据,采用 Redis Hash 方式进行存储,对应 key 为 user 和 guid |
由客户端触发或组件服务触发的消息推送,通过 Redis 存储的数据结构,在 WS-API 服务查询到返回消息体的目标客户端的 Socket ID,再有 WS-Gateway 服务进行集群消费,如果 Socket ID 不在当前节点,则需要进行节点与会话关系的查询,找到客端户 Socket ID 实际对应的 WS-Gateway 节点,通常有以下两种方案:
优点 | 缺点 | |
---|---|---|
事件广播 | 实现简单 | 消息广播数量会随着节点数量上升 |
注册中心 | 消息广播数量会随着节点数量上升 | 注册中心强依赖,额外运维成本 |
在确定使用事件广播方式进行网关节点间的消息传递后,进一步选择使用哪种具体的消息中间件,列举了三种待选的方案:
特性 | Redis | Kafka | RocketMQ |
---|---|---|---|
开发语言 | C | Scala | Java |
单机吞吐量 | 10w+ | 10w+ | 10w+ |
可用性 | 主从架构 | 分布式架构 | 分布式架构 |
特点 | 功能简单 | 吞吐量、可用性极高 | 功能丰富、定制化强,吞吐量、可用性高 |
功能特性 | 数据 10K 以内性能优异,功能简单,适用于简单业务场景 | 支持核心的 MQ 功能,不支持消息查询或消息回溯等功能 | 支持核心的 MQ 功能,扩展性强 |
于是对 Redis 和其他 MQ 中间件进行 100w 次的入队和出队操作,在测试过程中发现在数据小于 10K 时 Redis 性能表现十分优秀,进一步结合实际情况:广播内容的数据量大小在 1K 左右,业务场景简单固定,并且要兼容历史业务逻辑,最后选择了 Redis 进行消息广播。
后续还可以将 WS-API 与 WS-Gateway 两两互联,使用 gRPC stream 双向流通信节省内网流量。
会话在节点内存与 Redis 中存储后,客户端需要通过心跳上报持续更新会话时间戳,客户端按照服务端下发的周期进行心跳上报,上报时间戳首先在内存进行更新,然后再通过另外的周期进行 Redis 同步,避免大量客户端同时进行心跳上报对 Redis 产生压力。
for {
select {
case <-t.C:
var now = time.Now().Unix()
var clients = make([]*Connection, 0)
dispatcher.clients.Range(func(_, v interface{}) bool {
client := v.(*Connection)
lastTs := atomic.LoadInt64(&client.LastMessageTS)
if now-lastTs > int64(expireTime) {
clients = append(clients, client)
} else {
dispatcher.clearRedisMapping(client.Id, client.Uid, lastTs, clearTimeout)
}
return true
})
for _, cli := range clients {
cli.WsClose()
}
}
}
在已有的两级缓存刷新机制上,进一步通过动态心跳上报频率的方式降低心跳上报产生的服务端性能压力,默认场景中客户端对服务端进行间隔 1s 的心跳上报,假设目前单机承载了 50w 的连接数,当前的 QPS 为:QPS1 = 500000/1
从服务端性能优化的角度考虑,实现心跳正常情况下的动态间隔,每 x 次正常心跳上报,心跳间隔增加 a,增加上限为 y,动态 QPS 最小值为:QPS2=500000/y
极限情况下,心跳产生的 QPS 降低 y 倍。在单次心跳超时后服务端立刻将 a 值变为 1s 进行重试。采用以上策略,在保证连接质量的同时,降低心跳对服务端产生的性能损耗。
使用 Kafka 自定义 Headers 的目的是避免网关层出现对消息体解码而带来的性能损耗,客户端 WebSocket 连接建立成功后,会进行一系列的业务操作,我们选择将 WS-Gateway 和 WS-API 之间的操作指令和必要的参数放到 Kafka 的 Headers 中,例如通过 X-XX-Operator 为广播,再读取 X-XX-Guid 文件编号,对该文件内的所有用户进行消息推送。
字段 | 说明 | 描述 |
---|---|---|
X-ID | WebSocket ID | 连接 ID |
X-Uid | 用户 ID | 用户 ID |
X-Guid | 文件 ID | 文件 ID |
X-Inner | 网关内部操作指令 | 用户加入、用户退出 |
X-Event | 网关事件 | Connect/Message/Disconnect |
X-Locale | 语言类型设置 | 语言类型设置 |
X-Operator | api 层操作指令 | 单播、广播、网关内部操作 |
X-Auth-Type | 用户鉴权类型 | SDKV2、主站、微信、移动端、桌面 |
X-Client-Version | 客户端版本 | 客户端版本 |
X-Server-Version | 网关版本 | 服务端版本 |
X-Push-Client-ID | 客户端 ID | 客户端 ID |
X-Trace-ID | 链路 ID | 链路 ID |
在 Kafka Headers 中写入了 trace id 和 时间戳,可以追中某条消息的完整消费链路以及各阶段的时间消耗。
type Packet struct {
...
}
type Connect struct {
*websocket.Con
send chan Packet
}
func NewConnect(conn net.Conn) *Connect {
c := &Connect{
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
客户端与服务端的消息交互第一版的写法类似以上写法,对 Demo 进行压测,发现每个 WebSocket 连接都会占用 3 个 goroutine,每个 goroutine 都需要内存栈,单机承载连十分有限,主要受制于大量的内存占用,而且大部分时间 c.writer() 是闲置状态,于是考虑,是否只启用 2 个 goroutine 来完成交互。
type Packet struct {
...
}
type Connect struct {
*websocket.Conn
mux sync.RWMutex
}
func NewConnect(conn net.Conn) *Connect {
c := &Connect{
send: make(chan Packet, N),
}
go c.reader()
return c
}
func (c *Connect) Write(data []byte) (err error) {
c.mux.Lock()
defer c.mux.Unlock()
...
return nil
}
保留 c.reader() 的 goroutine,如果使用轮询方式从缓冲区读取数据,可能会产生读取延迟或者锁的问题,c.writer() 操作调整为主动调用,不采用启动 goroutine 持续监听,降低内存消耗。
调研了 gev 和 gnet 等基于事件驱动的轻量级高性能网络库,实测发现在大量连接场景下可能产生的消息延迟的问题,所以没有在生产环境下使用。
确定数据接收与发送逻辑后,网关部分的核心对象为 Connection 对象,围绕 Connection 进行了 run、read、write、close 等函数的开发。使用 sync.pool 来缓存该对象,减轻 GC 压力,创建连接时,通过对象资源池获取 Connection 对象,生命周期结束之后,重置 Connection 对象后 Put 回资源池。在实际编码中,建议封装 GetConn()、PutConn() 函数,收敛数据初始化、对象重置等操作。
var ConnectionPool = sync.Pool{
New: func() interface{} {
return &Connection{}
},
}
func GetConn() *Connection {
cli := ConnectionPool.Get().(*Connection)
return cli
}
func PutConn(cli *Connection) {
cli.Reset()
ConnectionPool.Put(cli) // 放回连接池
}
消息流转过程中,需要考虑消息体的传输效率优化,采用 MessagePack 对消息体进行序列化,压缩消息体大小。调整 MTU 值避免出现分包情况,定义 a 为探测包大小,通过如下指令,对目标服务 ip 进行 MTU 极限值探测。
ping -s {a} {ip}
a = 1400 时,实际传输包大小为:1428。其中 28 由 8(ICMP 回显请求和回显应答报文格式)和 20(IP 首部)构成。 如果 a 设置过大会导致应答超时,在实际环境包大小超过该值时会出现分包的情况。 在调试合适的 MTU 值的同时通过 MessagePack 对消息体进行序列号,进一步压缩数据包的大小,并减小 CPU 的消耗。
使用 EGO 框架( https://github.com/gotomicro/ego )进行服务开发:业务日志打印,异步日志输出,动态日志级别调整等功能,方便线上问题排查提升日志打印效率;微服务监控体系,CPU、P99、内存、goroutine 等监控。 客户端 Redis 监控:
客户端 Kafka 监控:
自定义监控大盘:
用户上线,50w 在线用户。
服务 | CPU | Memory | 数量 | CPU% | Mem% |
---|---|---|---|---|---|
WS-Gateway | 16 核 | 32G | 1 台 | 22.38% | 70.59% |
单个 WS-Gateway 每秒建立连接数峰值为:1.6w 个/s,每个用户占用内存:47K。
测试时间 15 分钟,在线用户 50w,每 5s 推送一条所有用户,用户有回执。推送内容为:
42["message",{"type":"xx","data":{"type":"xx","clients":[{"id":xx,"name":"xx","email":"xx@xx.xx","avatar":"ZgG5kEjCkT6mZla6.png","created_at":1623811084000,"name_pinyin":"","team_id":13,"team_role":"member","merged_into":0,"team_time":1623811084000,"mobile":"+xxxx","mobile_account":"","status":1,"has_password":true,"team":null,"membership":null,"is_seat":true,"team_role_enum":3,"register_time":1623811084000,"alias":"","type":"anoymous"}],"userCount":1,"from":"ws"}}]
测试经过 5 分钟后,服务异常重启,重启原因是内存使用量到超过限制。
分析内存超过限制的原因:
新增的广播代码用掉了 9.32% 的内存。
接收用户回执消息的部分消耗了 10.38% 的内存。
进行测试规则调整,测试时间 15 分钟,在线用户 48w,每 5s 推送一条所有用户,用户有回执。推送内容为:
42["message",{"type":"xx","data":{"type":"xx","clients":[{"id":xx,"name":"xx","email":"xx@xx.xx","avatar":"ZgG5kEjCkT6mZla6.png","created_at":1623811084000,"name_pinyin":"","team_id":13,"team_role":"member","merged_into":0,"team_time":1623811084000,"mobile":"+xxxx","mobile_account":"","status":1,"has_password":true,"team":null,"membership":null,"is_seat":true,"team_role_enum":3,"register_time":1623811084000,"alias":"","type":"anoymous"}],"userCount":1,"from":"ws"}}]
服务 | CPU | Memory | 数量 | CPU% | Mem% |
---|---|---|---|---|---|
WS-Gateway | 16 核 | 32G | 1 台 | 44% | 91.75% |
连接数建立峰值:1w 个/s,接收数据峰值:9.6w 条/s,发送数据峰值 9.6w 条/s。
测试时间 15 分钟,在线用户 50w,每 5s 推送一条所有用户,用户无需回执。推送内容为:
42["message",{"type":"xx","data":{"type":"xx","clients":[{"id":xx,"name":"xx","email":"xx@xx.xx","avatar":"ZgG5kEjCkT6mZla6.png","created_at":1623811084000,"name_pinyin":"","team_id":13,"team_role":"member","merged_into":0,"team_time":1623811084000,"mobile":"+xxxx","mobile_account":"","status":1,"has_password":true,"team":null,"membership":null,"is_seat":true,"team_role_enum":3,"register_time":1623811084000,"alias":"","type":"anoymous"}],"userCount":1,"from":"ws"}}]
服务 | CPU | Memory | 数量 | CPU% | Mem% |
---|---|---|---|---|---|
WS-Gateway | 16 核 | 32G | 1 台 | 30% | 93% |
连接数建立峰值:1.1w 个/s,发送数据峰值 10w 条/s,出内存占用过高之外,其他没有异常情况。 内存消耗极高,分析火焰图,大部分消耗在定时 5s 进行广播的操作上。
测试时间 15 分钟,在线用户 50w,每 5s 推送一条所有用户,用户有回执。每秒 4w 用户上下线。推送内容为:
42["message",{"type":"xx","data":{"type":"xx","clients":[{"id":xx,"name":"xx","email":"xx@xx.xx","avatar":"ZgG5kEjCkT6mZla6.png","created_at":1623811084000,"name_pinyin":"","team_id":13,"team_role":"member","merged_into":0,"team_time":1623811084000,"mobile":"+xxxx","mobile_account":"","status":1,"has_password":true,"team":null,"membership":null,"is_seat":true,"team_role_enum":3,"register_time":1623811084000,"alias":"","type":"anoymous"}],"userCount":1,"from":"ws"}}]
服务 | CPU | Memory | 数量 | CPU% | Mem% |
---|---|---|---|---|---|
WS-Gateway | 16 核 | 32G | 1 台 | 46.96% | 65.6% |
连接数建立峰值:18570 个/s,接收数据峰值:329949 条/s,发送数据峰值 393542 条/s,未出现异常情况。
在 16C 32G
内存的硬件条件下,单机 50w
连接数,进行以上包括用户上下线、消息回执等四个场景的压测,内存和 CPU
消耗都符合预期,并且在较长时间的压测下,服务也很稳定。满足目前量级下的资源节约要求,可在此基础上继续完善功能开发。
面临日益增加的用户量,网关服务的重构是势在必行,本次重构主要是:
对网关服务与业务服务的解耦,移除对 Nginx 的依赖,让整体架构更加清晰。
从用户建立连接到底层业务推送消息的整体流程分析,对其中这些流程进行了具体的优化。以下各个方面让 2.0 版本的网关有了更少的资源消耗,更低的单位用户内存损耗、更加完善的监控报警体系,让网关服务本身更加可靠:
可降级的握手流程;
Socket ID 生产;
客户端心跳处理过程的优化;
自定义 Headers 避免了消息解码,强化了链路追踪与监控;
消息的接收与发送代码结构设计上的优化;
对象资源池的使用,使用缓存降低 GC 频率;
消息体的序列化压缩;
接入服务观测基础设施,保证服务稳定性。
在保证网关服务性能过关的同时,更进一步的是收敛底层组件服务对网关业务调用的方式,从以前的 HTTP、Redis、Kafka 等方式,统一为 gRPC 调用,保证了来源可查可控,为后续业务接入打下了更好的基础。
收录了部分文章相关内容的讨论问题:
问题:按照我的理解 socketID
存在的价值是 Kafka 的消费者需要根据 socketID
找到对应的tcp 链接,既然你们已经有了自定义网关,那么引入 kafka 的意义是什么?消息的持久化?为什么不在网关层做负载均衡,让节点直接跟客户端通信。另外我猜测消费发送者需要根据 socketId
做 hash 然后发送到对应的 partition,一旦初始 partition 过小,进行扩容时,客户端和服务端都得进行重启或则升级,不知道引入 kafka 的意义在哪里,相反还极大的增加了架构的复杂度和维护成本,扩展性也没那么好,如果是 http 短链接还能理解。
回答:图中没画出 SLB,是有负载均衡的。我们没有采用 socket id hash 到对应 partition,kafka 的作用是在处理网关内部的不需要关心顺序和推送消息的流转,如果没有kafka,那么组件或者网关滚动更新,用户重连的过程中,就可能丢消息;对于需要顺序的消息,例如 ping pong 模式的是可以通过网关识别到 header 头里的 cmd 信息,找到对应后端,分发消息。
问题:广播内容的数据量大小在 1K
左右,业务场景简单固定,并且要兼容历史业务逻辑,最后选择了 Redis
进行消息广播。api
与网关交互不是通过 kafka
吗,这里是什么意思呢?
回答:网关节点对
kafka
的消费是集群模式。如果kafka
,在k8s
条件下,使用广播模式比较麻烦。所以老的网关是用redis
做pub/sub
的广播,为了兼容老的逻辑仍然采用redis
做广播。同时后续我们打算直接将api
和ws
做两两互联,通过grpc stream
做广播,有更好的扩展性。
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/lK5-NNvXRIOBWbpAQeR0NQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。