0%

nsq消息消费原理分析&消费限流实现

一、背景

由于nsq消息队列,使用push模式进行消息分发。众所周知,push模式消费消息带来的一个难点,就是消费速率的控制。和pull模式通过consumer端自己主动拉消息来按需消费不同,push模式需要broker端根据对应consumer的消息处理能力来决定控制消息推送速率。如果控制的不当,就会导致consumer机器出现消息阻塞,最终导致业务系统出现非预期故障。

nsq本身提供RDY机制来控制推送速率,consumer客户端有自己的策略(默认策略见后面章节分析)来更新RDY值,然后定时任务,每隔1s将最新的RDY值告诉broker端,此时broker端按照新的RDY值来推送消息。

生产环境遇到的一个case,业务流程如下:

线上case流程

出现的问题,不是由于consumer客户端本身的处理能力,而是由于外部服务进行了限流,导致超过指定阈值之后,会直接返回错误,这样consumer端就会有大量的消息处理失败,从而出现非预期失败报警。

但是,我们的预期,是希望消息可以按照我们期望的速率进行消费,因此,抛开nsq自身的RDY机制,需要主动控制推送速率,也就是我们需要对消息施加主动限流。

在对nsq 消息消费进行限流之前,先对nsq consumer client端代码进行分析。

二、nsq消费流程分析

公司内部,有赞对nsq进行了二次开发,并进行了开源。增加了很多功能- NSQ重塑之详细设计

整体架构图如下:

nsq架构

nsq java客户端也进行了开源,不过开源版本后期没有进行升级维护,这里贴一个早期的设计文档:How we redesign the NSQ-Smart Client

本文主要对消息消费的限流能力进行解析,因此,下面主要对消息消费的核心流程进行介绍。

2.1 nsq consumer client 实现

nsq 消息消费模块,主要包括三个部分:

  • 构造consumer对象
  • 启动consumer对象
  • 处理nsq推送的消息

2.1.1 构造consumer对象

初始化流程,如下:

构造nsq consumer流程

new consumer 逻辑比较简单,就是设置一些必须的参数,NsqConfig中的很多线程池大小,RDY 策略,各种时间等参数都是在这里加载进去的。
本部分,最核心的是在new完bootstrap对象之后,将其处理handler设置为NSQClientInitializer,在NSQClientInitializer类中,是 netty 的 pipeline设置,pipeline的最后handler为NSQHandler,该handler就是具体处理nsq消息的代码部分。
此外,在具体的消息处理,使用的是fixed线程池,其缺点是阻塞队列长度int最大,存在内存爆掉的风险。但是,在nsq的整体消费推送架构场景下,由于RDY控制了nsq消息推送,所以风险就不会存在。

2.1.2 启动consumer,请求消费消息

在上面初始化完consumer之后,接下来就是去nsqd/broker 服务端申请订阅主题消息,具体流程如下:

构造nsq consumer流程

以上,对于nsqd的具体处理,后面再详细介绍。
主要流程为:1. 获取topic所在的nsqd地址;2. 和nsqd地址进行连接;3.发送SUB指令,请求消费对应topic数据
在client中,存在好几个单独的调度线程,周期去远程更新各种地址和数值。
start的流程里,有一个周期推送最新RDY值给nsqd,这里的RDY计算策略,绝大部分情况下使用的是默认的。下面贴个代码看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

private IExpectedRdyUpdatePolicy DEFAULT_EXP_RDY_POLICY = new IExpectedRdyUpdatePolicy() {
@Override
public int expectedRdyIncrease(int currentExpRdy, int expectedRdyConf) {
int newExpRdy = (int)(Math.round(currentExpRdy * 1.5));
if(newExpRdy <= expectedRdyConf)
return newExpRdy;
else {
return expectedRdyConf;
}
}

@Override
public int expectedRdyDecline(int currentExpRdy, int expectedRdy) {
int newExpRdy = (int)(currentExpRdy - Math.round(currentExpRdy * 0.25));
if(newExpRdy > 0) {
return newExpRdy;
} else {
return 1;
}
}
};

代码逻辑比较简单,就是设置的expectedRdyConf为最大值,然后按照0.5倍增加RDY,按照0.25倍减少RDY。定时任务,会将最终的RDY,推送给nsqd,按照每个conn维度。

2.1.3 接收&处理nsq消息

启动consumer,和存有topic数据的nsqd建立连接之后,就等待存在新消息之后,nsqd推送消息,然后consumer接收消息。
主要处理流程如下:

处理nsq流程

具体消息读取和处理部分,主要是使用netty能力。在start的过程中,已经和nsqd进行了网络连接,请求SUB消息,再存在待消费消息之后,nsqd会通过这个conn连接进行消息的分发推送。
在消息处理成功或者异常情况下,会对RDY进行更新。

以上三个部分,即nsq consumer客户端的消息订阅和消息接收处理的全过程。

2.2 nsqd 订阅逻辑实现

由于本文主要介绍nsq消费限流,所以这里主要介绍订阅消费逻辑,为了更完善整个流程,对于消息发送,也会简单介绍。
同样,本节分析,同样基于youzan基于nsq二次开发的分支:youzan/nsq

继续按照三部分来介绍订阅逻辑:

  • nsqd服务启动部分(主要是服务初始化资源准备);
  • nsqd接收客户端消费请求部分;
  • nsqd推送消息部分。

特别说明,有赞的nsq版本,相对原生nsq做了很多的调整优化,了解代码前,还是再次建议看下:NSQ重塑之详细设计

2.2.1 nsqd服务启动部分

apps/nsqd中的nsqd.go文件,就是启动nsqd的入口,代码如下:

1
2
3
4
5
6
7
8
9
10

func main() {
defer glog.Flush()
prg := &program{}
if err := svc.Run(prg, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT); err != nil {
log.Panic(err)
}
log.Println("app exited.")
}

基于svc框架,有Init、Start、Stop三个方法需要实现,nsqd的核心逻辑在Start方法中,为了更直观看实现流程,这里不尽量不贴源码,做一些简单的类方法时序调用图。

nsqd服务流程

如上流程,这里针对三个核心点,深入分析下。

2.2.1.1 构造nsqd server实例

构造nsqd流程

以上涉及到client和nsqd交互流程的,主要是 internalPubLoop设置,这个方法主要包含在消息从producer发送过来之后,核心的处理逻辑。
初始化的时候,没有consumer端处理方法的设置,主要是nsqd是针对每个client发送过来的订阅请求构建处理上下文环境的,直接由SUB请求来触发初始化即可。

2.2.1.2 元数据加载

和流程图介绍的那样,LoadMetadata,主要是完成元数据的加载,包括 topic 和 channel的详细信息,其中在针对topic下的channel构造的时候,会异步协程启动读取持久化设备上消息的任务,然后写入到通道chan中。

加载元数据流程

以上涉及到后期消费分析的核心动作,就是在初始化channel的时候,会启动一个golang协程,协程会从存储设备上读取消息数据,然后推送到chan 通道。从chan通道来获取消息,推送到消费端的逻辑,在1.2.2 来分析。

2.2.1.3 启动nsqd server 服务

上面所有内部准备工作处理完成之后,现在可以启动server服务,监听连接请求,处理外部治理了。
具体实现时序图如下:

启动nsqd流程

服务启动之后,等待client发出请求,建立连接,单独协程,处理指令。

2.2.2 nsqd接收客户端消费请求部分

本节,简单介绍nsqd服务,接收到客户端SUB请求下,如何处理。在上面一节中,已经介绍了nsqd接收客户端字节流,然后解码的流程,下面主要分析SUB指令的处理流程:

nsqd处理订阅流程

2.2.3 nsqd推送消息部分

消费端发送订阅请求成功后,和nsqd服务保持一个连接conn,接下来,nsqd会将订阅的topic消息通过conn发送给消费端。

nsqd读取消息持久化存储数据,然后将消息数据最终写回订阅客户端的主要流程已经进行介绍,下面主要是介绍订阅到推送这部分流程:

nsqd推送消息流程

三、nsq消费RDY管理

首先,带上RDY指令,看看消费端和nsq服务端的交互流程:

rdy交互流程

因此,RDY管理,主要包括:客户端的RDY更新策略,以及 服务端更新处理流程。其中客户端的rdy更新策略,上节已经介绍,本节接下来,主要是nsqd服务端的rdy实现细节。

在nsqd服务端的rdy能力上,主要包括对每个consumer客户端的rdy维护,以及在push的时候,根据rdy控制push速率。

3.1 nsqd RDY 更新维护

rdy更新流程

nsqd接收客户端,然后更新对应rdy值逻辑很简单。本质上,就是对维护在client上的变量做原子更新,就ok了。

3.2 nsqd RDY控制速率

根据客户端提供的rdy值,控制推送速率。一般而言,就是发送消息的时候,判断下当前的rdy是否大于在途消息数,如果是,继续发送,否则停止。

具体nsqd的处理流程,如下:

rdy控制推送流程

因此,当我们consumer客户端暂时无法处理更多消息的时候,我们可以通过RDY指令来限制nsqd服务端推送消息的速率,从而达到更友好的限制姿势。

四、自定义nsq消费限流设计

nsq组件在nsqd服务端,会有自己的限流策略,避免nsqd服务因为流量太大导致系统问题。但是,这里主要介绍的是nsq消费端的限流策略,其保护的是消费端的业务系统。

结合上面对nsq的分析,尤其是针对nsq rdy能力的分析,我们可以充分利用rdy的特性来完成限流。

其大概的设计流程如下:

nsq消费限流流程

以上整体逻辑和思想比较简单,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.youzan.pay.risk.commons.impl.listener.telsa;  

import com.google.common.collect.Maps;
import com.youzan.nsq.client.Consumer;
import com.youzan.nsq.client.entity.NSQMessage;
import com.youzan.nsq.client.exception.NSQException;
import com.youzan.tesla.trafficlimit.portadapter.driving.manual.ProgrammingTrafficLimitAdapter;
import com.youzan.tesla.trafficlimit.portadapter.driving.manual.TrafficLimitDecision;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 基于xxx限流组件的nsq限流组件
* @author tao.ke Date: 2021/10/11 Time: 10:40 上午
*/
@Slf4j
public class NsqRateLimitComponent {

/**
* 最大避让时间为2s
*/ private static final long MAX_BACKOFF_PERIOD = 2000L;

/**
* 避让时间因子
*/
private static final long BACKOFF_MULTIPLIER = 100L;

private final ConcurrentMap<String, AtomicInteger> topic2BackoffCounter = Maps.newConcurrentMap();

private final ProgrammingTrafficLimitAdapter programmingTrafficLimitAdapter;

public NsqRateLimitComponent(String appName) {

// 初始化限流组件对象
this.programmingTrafficLimitAdapter = new ProgrammingTrafficLimitAdapter(() -> appName);
}

/**
* 处理限流判断和相关动作
*
* @param nsqMessage nsq消息
* @return 是否触发限流操作
*/
public boolean processLimit(NSQMessage nsqMessage, String channel, Consumer nsqConsumer) {

Map<String, String> features = Maps.newHashMap();
String topic = nsqMessage.getTopicInfo().getTopicName();
features.put("topic", topic);
features.put("channel", channel);

// 限流组件决策是否超过设置的阈值
TrafficLimitDecision decision = this.programmingTrafficLimitAdapter.apply(features);

// 不限流,直接返回false
if (!decision.shouldLimit()) {
return false;
}

AtomicInteger backoffCounter = topic2BackoffCounter.computeIfAbsent(topic, t -> new AtomicInteger(0));

//backoff and requeue
long backoffPeriod = calculateBackoff(backoffCounter.get() + 1);

//max backoff delay
if (backoffPeriod > 0L) {
if (backoffPeriod > MAX_BACKOFF_PERIOD) {
backoffPeriod = MAX_BACKOFF_PERIOD;
} else {
backoffCounter.incrementAndGet();
}
nsqMessage.disableAutoResponse();
try {
// 消息重新入队列,并且同时backoff
nsqMessage.requeue((int) TimeUnit.MILLISECONDS.toSeconds(backoffPeriod), true, backoffPeriod);
log.warn("[NSQ消费限流] {}:{} backoff in next {}ms", topic, channel, backoffPeriod);
} catch (NSQException e) {
log.warn("[NSQ消费限流] fail to backoff {}:{} as connection may closed. Exception: {}", topic, channel, e.getLocalizedMessage());
}
}
return true;
}


/**
* 根据之前避让次数计算当前需要避让的时间
*
* @param backoffCount 避让次数
* @return 避让时间
*/
private long calculateBackoff(long backoffCount) {

return (long) (BACKOFF_MULTIPLIER * Math.pow(2.0, backoffCount));
}

}

requeue()发放中,对于backoff开关进行打开,内部会执行 backoffTopicInMillSecond(Topic topic, long resumeDelayInMillSecond)

针对每一个topic+conn的连接,将rdy更新为0,同时,注册一个定时任务,在resumeDelayInMillSecond时间之后,执行rdy更新为1的操作。

1
2
3
4
5
6
private ScheduledExecutorService resumeExec = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("resume-scheduler", Thread.NORM_PRIORITY));

// resumeDelayRunnable 则是将rdy设置为1的操作
// 不管是设置为0,还是设置为1,都需要加lock,避免并发情况
resumeExec.schedule(()-> resumeDelayRunnable(topicName), resumeDelayInMillSecond, TimeUnit.MILLISECONDS);

五、总结

本文主要是针对nsq消费流程进行分析,同时,为了最终消费限流目标,本文重点分析了nsq内部限制推送速率的rdy机制。由于nsq客户端默认的rdy更新策略已经比较完善,并且调整起来,比较难达到所见即所得的限流效果。

因此,本文使用直接将rdy设置为0的策略,粗暴的暂停nsqd服务端消息推送,在一段时间之后,再设置为1,根据rdy的更新策略,逐步恢复rdy到设置的最大阈值。

六、补充

本文对于消息磁盘持久化的介绍和分析,只是简单提到。虽然有赞对文件持久化的内容和格式做了一些优化和调整,不过如果想了解细节,可以先从剖析nsq消息队列(三) 消息传输的可靠性和持久化[二]diskqueue 入手,了解下原生nsq架构的持久化。

此外,有赞对nsq的改造,也可以从上面的 github doc 文档列表上,找到一些有用的资料。