一、背景
由于nsq消息队列,使用push模式进行消息分发。众所周知,push模式消费消息带来的一个难点,就是消费速率的控制。和pull模式通过consumer端自己主动拉消息来按需消费不同,push模式需要broker端根据对应consumer的消息处理能力来决定控制消息推送速率。如果控制的不当,就会导致consumer机器出现消息阻塞,最终导致业务系统出现非预期故障。
nsq本身提供RDY机制来控制推送速率,consumer客户端有自己的策略(默认策略见后面章节分析)来更新RDY值,然后定时任务,每隔1s将最新的RDY值告诉broker端,此时broker端按照新的RDY值来推送消息。
生产环境遇到的一个case,业务流程如下:
出现的问题,不是由于consumer客户端本身的处理能力,而是由于外部服务进行了限流,导致超过指定阈值之后,会直接返回错误,这样consumer端就会有大量的消息处理失败,从而出现非预期失败报警。
但是,我们的预期,是希望消息可以按照我们期望的速率进行消费,因此,抛开nsq自身的RDY机制,需要主动控制推送速率,也就是我们需要对消息施加主动限流。
在对nsq 消息消费进行限流之前,先对nsq consumer client端代码进行分析。
二、nsq消费流程分析
公司内部,有赞对nsq进行了二次开发,并进行了开源。增加了很多功能- NSQ重塑之详细设计
整体架构图如下:
nsq java客户端也进行了开源,不过开源版本后期没有进行升级维护,这里贴一个早期的设计文档:How we redesign the NSQ-Smart Client
本文主要对消息消费的限流能力进行解析,因此,下面主要对消息消费的核心流程进行介绍。
2.1 nsq consumer client 实现
nsq 消息消费模块,主要包括三个部分:
- 构造consumer对象
- 启动consumer对象
- 处理nsq推送的消息
2.1.1 构造consumer对象
初始化流程,如下:
new consumer 逻辑比较简单,就是设置一些必须的参数,
NsqConfig
中的很多线程池大小,RDY 策略,各种时间等参数都是在这里加载进去的。
本部分,最核心的是在new完bootstrap对象之后,将其处理handler设置为NSQClientInitializer
,在NSQClientInitializer
类中,是 netty 的pipeline
设置,pipeline
的最后handler为NSQHandler
,该handler就是具体处理nsq消息的代码部分。
此外,在具体的消息处理,使用的是fixed线程池,其缺点是阻塞队列长度int最大,存在内存爆掉的风险。但是,在nsq的整体消费推送架构场景下,由于RDY控制了nsq消息推送,所以风险就不会存在。
2.1.2 启动consumer,请求消费消息
在上面初始化完consumer之后,接下来就是去nsqd/broker 服务端申请订阅主题消息,具体流程如下:
以上,对于nsqd的具体处理,后面再详细介绍。
主要流程为:1. 获取topic所在的nsqd地址;2. 和nsqd地址进行连接;3.发送SUB指令,请求消费对应topic数据
在client中,存在好几个单独的调度线程,周期去远程更新各种地址和数值。
start的流程里,有一个周期推送最新RDY值给nsqd,这里的RDY计算策略,绝大部分情况下使用的是默认的。下面贴个代码看下:
1 |
|
代码逻辑比较简单,就是设置的expectedRdyConf
为最大值,然后按照0.5倍增加RDY,按照0.25倍减少RDY。定时任务,会将最终的RDY,推送给nsqd,按照每个conn维度。
2.1.3 接收&处理nsq消息
启动consumer,和存有topic数据的nsqd建立连接之后,就等待存在新消息之后,nsqd推送消息,然后consumer接收消息。
主要处理流程如下:
具体消息读取和处理部分,主要是使用netty能力。在start的过程中,已经和nsqd进行了网络连接,请求SUB消息,再存在待消费消息之后,nsqd会通过这个conn连接进行消息的分发推送。
在消息处理成功或者异常情况下,会对RDY进行更新。
以上三个部分,即nsq consumer客户端的消息订阅和消息接收处理的全过程。
2.2 nsqd 订阅逻辑实现
由于本文主要介绍nsq消费限流,所以这里主要介绍订阅消费逻辑,为了更完善整个流程,对于消息发送,也会简单介绍。
同样,本节分析,同样基于youzan基于nsq二次开发的分支:youzan/nsq
继续按照三部分来介绍订阅逻辑:
- nsqd服务启动部分(主要是服务初始化资源准备);
- nsqd接收客户端消费请求部分;
- nsqd推送消息部分。
特别说明,有赞的nsq版本,相对原生nsq做了很多的调整优化,了解代码前,还是再次建议看下:NSQ重塑之详细设计
2.2.1 nsqd服务启动部分
在apps/nsqd
中的nsqd.go
文件,就是启动nsqd的入口,代码如下:
1 |
|
基于svc
框架,有Init、Start、Stop三个方法需要实现,nsqd的核心逻辑在Start方法中,为了更直观看实现流程,这里不尽量不贴源码,做一些简单的类方法时序调用图。
如上流程,这里针对三个核心点,深入分析下。
2.2.1.1 构造nsqd server实例
以上涉及到client和nsqd交互流程的,主要是 internalPubLoop
设置,这个方法主要包含在消息从producer发送过来之后,核心的处理逻辑。
初始化的时候,没有consumer端处理方法的设置,主要是nsqd是针对每个client发送过来的订阅请求构建处理上下文环境的,直接由SUB请求来触发初始化即可。
2.2.1.2 元数据加载
和流程图介绍的那样,LoadMetadata
,主要是完成元数据的加载,包括 topic 和 channel的详细信息,其中在针对topic下的channel构造的时候,会异步协程启动读取持久化设备上消息的任务,然后写入到通道chan中。
以上涉及到后期消费分析的核心动作,就是在初始化channel的时候,会启动一个golang协程,协程会从存储设备上读取消息数据,然后推送到chan 通道。从chan通道来获取消息,推送到消费端的逻辑,在1.2.2 来分析。
2.2.1.3 启动nsqd server 服务
上面所有内部准备工作处理完成之后,现在可以启动server服务,监听连接请求,处理外部治理了。
具体实现时序图如下:
服务启动之后,等待client发出请求,建立连接,单独协程,处理指令。
2.2.2 nsqd接收客户端消费请求部分
本节,简单介绍nsqd服务,接收到客户端SUB请求下,如何处理。在上面一节中,已经介绍了nsqd接收客户端字节流,然后解码的流程,下面主要分析SUB指令的处理流程:
2.2.3 nsqd推送消息部分
消费端发送订阅请求成功后,和nsqd服务保持一个连接conn,接下来,nsqd会将订阅的topic消息通过conn发送给消费端。
nsqd读取消息持久化存储数据,然后将消息数据最终写回订阅客户端的主要流程已经进行介绍,下面主要是介绍订阅到推送这部分流程:
三、nsq消费RDY管理
首先,带上RDY指令,看看消费端和nsq服务端的交互流程:
因此,RDY管理,主要包括:客户端的RDY更新策略,以及 服务端更新处理流程。其中客户端的rdy更新策略,上节已经介绍,本节接下来,主要是nsqd服务端的rdy实现细节。
在nsqd服务端的rdy能力上,主要包括对每个consumer客户端的rdy维护,以及在push的时候,根据rdy控制push速率。
3.1 nsqd RDY 更新维护
nsqd接收客户端,然后更新对应rdy值逻辑很简单。本质上,就是对维护在client上的变量做原子更新,就ok了。
3.2 nsqd RDY控制速率
根据客户端提供的rdy值,控制推送速率。一般而言,就是发送消息的时候,判断下当前的rdy是否大于在途消息数,如果是,继续发送,否则停止。
具体nsqd的处理流程,如下:
因此,当我们consumer客户端暂时无法处理更多消息的时候,我们可以通过RDY指令来限制nsqd服务端推送消息的速率,从而达到更友好的限制姿势。
四、自定义nsq消费限流设计
nsq组件在nsqd服务端,会有自己的限流策略,避免nsqd服务因为流量太大导致系统问题。但是,这里主要介绍的是nsq消费端的限流策略,其保护的是消费端的业务系统。
结合上面对nsq的分析,尤其是针对nsq rdy能力的分析,我们可以充分利用rdy的特性来完成限流。
其大概的设计流程如下:
以上整体逻辑和思想比较简单,具体代码如下:
1 | package com.youzan.pay.risk.commons.impl.listener.telsa; |
在requeue()
发放中,对于backoff
开关进行打开,内部会执行 backoffTopicInMillSecond(Topic topic, long resumeDelayInMillSecond)
针对每一个topic+conn的连接,将rdy更新为0,同时,注册一个定时任务,在resumeDelayInMillSecond时间之后,执行rdy更新为1的操作。
1 | private ScheduledExecutorService resumeExec = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("resume-scheduler", Thread.NORM_PRIORITY)); |
五、总结
本文主要是针对nsq消费流程进行分析,同时,为了最终消费限流目标,本文重点分析了nsq内部限制推送速率的rdy机制。由于nsq客户端默认的rdy更新策略已经比较完善,并且调整起来,比较难达到所见即所得的限流效果。
因此,本文使用直接将rdy设置为0的策略,粗暴的暂停nsqd服务端消息推送,在一段时间之后,再设置为1,根据rdy的更新策略,逐步恢复rdy到设置的最大阈值。
六、补充
本文对于消息磁盘持久化的介绍和分析,只是简单提到。虽然有赞对文件持久化的内容和格式做了一些优化和调整,不过如果想了解细节,可以先从剖析nsq消息队列(三) 消息传输的可靠性和持久化[二]diskqueue 入手,了解下原生nsq架构的持久化。
此外,有赞对nsq的改造,也可以从上面的 github doc 文档列表上,找到一些有用的资料。