一、事务型消息实现背景
如果要在交易等场景中使用消息,则一致性是非常关键的需求。也就是不能出现业务操作成功消息未发出或者消息发出了但是业务并没有成功的情况。举例来说,支付服务使用消息通知出票服务,那么不能出现支付成功,但是消息没有发出,这会引起用户投诉;但是也不能出现支付未成功,但是消息发出最后出票了,这会导致公司损失。总结一下就是发消息和业务需要有事务保证。
提到一致性,大家肯定就想到事务,而一提到事务,肯定就想到关系型数据库,那么我们是不是可以借助关系型DB里久经考验的事务来实现这个一致性呢。我们以MySQL为例,对于MySQL中同一个实例里面的db,如果共享相同的Connection的话是可以在同一个事务里的。
以上介绍来自:
https://github.com/qunarcorp/qmq/blob/master/docs/cn/transaction.md
1.1 实现出发点
有赞内部目前有两套实现:
- 支付内部的实现,实现基本上和
qmq
实现类似,有个task负责在发送消息失败的时候,去扫表捞取消息,然后发送出去。这个任务通过内部分布式调度平台来配置和触发。因此,每个接入事务型消息的时候,都需要去申请配置,麻烦;此外,开发的时候,接入成本也比较大,用起来不方便;
- 公司提供的事务消息,首先和上面一样,这个组件基本上没有人维护。此外,其实现是基于老版本的nsq client来实现的,并且基于xml来配置,在当前应用基本上的新版本的nsq api,并且使用spring boot模式引入DataSource,DataSourceTransactionManager等情况下,想干净接入成本也不小。不过它不需要配置分布式调度任务,这点还是比较输入的,但是实现模式上,在事务提交的时候,没有同步删除数据表消息,有点不太好。
基于以上的问题,准备在风控内部,独立实现一个事务型消息组件,从而来解决方面两套实现方案的问题。
二、事务消息实现设计
首先,我们看下组件整体的调用交互图:
从如上的交互图,可以看出,为了利用db的事务特性,我们需要在业务DB数据库中新增一个消息表,将业务表和消息表打包成一个事务,这样,消息的事务性,就交给Msql等具有事务特性的DB来保证。
消息表设计,如下:
消息表使用,暂时不考虑分片等对于发送消息失败时,扫表提升性能的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13
| CREATE TABLE `risk_tx_message` ( `id` BIGINT AUTO_INCREMENT NOT NULL COMMENT '主键', `app_name` VARCHAR(64) NOT NULL COMMENT '应用名', `msg_body` VARCHAR(8192) NOT NULL COMMENT '消息体', `topic` VARCHAR(64) NOT NULL COMMENT 'NSQ topic', `extra` VARCHAR(1024) NOT NULL COMMENT '扩展', `env` VARCHAR(16) NOT NULL COMMENT '环境标', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), KEY `idx_query` (`app_name`,`env`) ) ENGINE=InnoDB CHARSET=utf8mb4 COMMENT='事务消息表'
|
此外,在交互里面,我们还有一个nsq task模块,这个模块我们在sdk内部提供实现,因为应用是分布式部署,因此这里借鉴公司的实现方式,使用db乐观锁,来作为分布式锁,完成对失败消息的捞取和重新推送。
1 2 3 4 5 6 7 8 9 10 11 12 13
| CREATE TABLE `risk_tx_message_lock` ( `id` BIGINT UNSIGNED AUTO_INCREMENT NOT NULL COMMENT '主键', `lock_key` VARCHAR(64) NOT NULL COMMENT '锁key', `app_name` VARCHAR(64) NOT NULL COMMENT '应用名', `expire_time` BIGINT NOT NULL COMMENT '过期时间', `extra` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '扩展字段', `env` varchar(16) NOT NULL COMMENT '环境标', `holder` varchar(32) NOT NULL COMMENT '占用锁身份标识', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uniq_app_name` (`app_name`,`lock_key`,`env`) ) ENGINE=InnoDB CHARSET=utf8mb4 COMMENT='事务消息分布式锁表'
|
以上,就基本上可以实现事务消息了。
PS:特别说明,在两个表中都增加环境标,1.为了qa sc测试方便,2.预发和线上共用一个表,但是需要进行数据和消息发送隔离。
三、核心功能的具体实现
首先说明,基于spring boot 模式来实现组件。
主要是三块实现:
- 写一个
TransactionListener
:处理事务中消息的所有存储和删除等动作,主要提供给TransactionSynchronization
的实现类使用。
- 事务nsq producer:支持发送事务消息,
send
主要的动作是启动事务初始化工作,然后写数据到消息表。
- 提供一个task组件:定时去获取分布式锁,捞取失败消息,重试发送。基于场景,基于数据库实现一个简单的分布式锁工具。
3.1 核心事务消息逻辑
核心的事务消息,主要是新增一个消息表的操作,在事务开始的时候,初始化上下文;发送消息的时候,插入数据到消息表,然后事务提交的时候,发送消息删除数据,如果发送失败,则后面的消息task组件来扫表拉取数据,重新发送消息。
时序图如下:
这里的事务管理,是基于spring TransactionSynchronization
能力,在发送消息的事务开始后,通过TransactionSynchronizationManager.registerSynchronization(this)
将实现类注册到spring环境中,即可使用spring事务能力做一些定制化的代码注入。
此外,在开启spring事务管理器前,需要明确判断TransactionSynchronizationManager.isSynchronizationActive()
是否事务激活,一般是代码使用的问题,需要及时报错,在开发调试的时候,就可以发现并修复。
3.2 事务补偿任务Task
事务消息的补偿事务task本身并不复杂,但是为了简单起见,我们在每个进程都启动了一个定时调度线程来执行定时拉取数据表数据,然后执行补偿发送消息。这就意味着,需要控制同一个时间节点,最好只能有一个任务在执行补偿消息发送,从而避免不必要的重复消息。
从上面事务消息的核心设计来看,在消息组件高可用的保障下,失败消息的量应该是非常低,因此我们需要补偿确保消息最终成功的消息量也非常少,这样,为了轻便,我们自己基于数据库实现了一个简单的分布式锁。获得锁的调度任务,才可以执行捞取失败消息,然后补偿发送到broker端。
下面给出实现的时序图:
3.3 基于spring boot 提供快速引入
整个使用spring boot
引入时,从spring 环境中注入nsqProducer
消息发送组件和dataSource
数据源组件。
代码简单如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Configuration @ConditionalOnProperty(name = RiskConstants.TRANSACTION_NSQ_SWITCH, havingValue = "true") @Slf4j public class TransactionalNsqConfiguration { @DependsOn("dataSource") @Bean("txNsqProducer") public TxNsqProducer txNsqProducer(DataSource dataSource, NsqProducer nsqProducer) { log.info("-------使用事务消息管理事务,初始化producer--------"); MessageLockStore messageLockStore = new MysqlMessageLockStore(dataSource); MessageStore messageStore = new MysqlMessageStore(dataSource); NsqTransactionSynchronization nsqTransactionSynchronization = new NsqTransactionSynchronization(messageStore); NsqTransactionScanTask scanTask = new NsqTransactionScanTask(nsqProducer, messageStore, messageLockStore); return new TxNsqProducerImpl(nsqTransactionSynchronization, nsqProducer, scanTask); } }
|
这样,在spring环境里,注入txNsqProducer
组件即可完成。
本组件,目前只支持单数据源,如果存在多数据源,则自己实现上面的注入bean的代码,DataSource使用自己项目里面对应的bean name即可。
四、事务型消息使用指南
在application.properties中引入开关配置
1 2 3 4
| #广播相关配置
transaction.nsq.enable=true
|
同时,一般我们的项目,在spring上下文中都已经存在dataSource
实例和nsqProducer
实例,所以接下来可以直接在代码中,使用该注入进来的事务消息实例,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
@Component("namesListProducer") @Slf4j public class NsqNamesListProducer implements NamesListProducer {
private static final String NAME_CHANGED_NSQ_TOPIC = "fin_risk_names_list_changed";`
@Resource private TxNsqProducer txNsqProducer; @Override public void sendNameChangedMsg(NameChangedMsgVo msg) { NameChangedMsgResult msgResult = new NameChangedMsgResult(); msgResult.setNamesCode(msg.getNamesCode()); msgResult.setNameVal(msg.getNameVal()); msgResult.setChangeTime(DateUtils.toStr(msg.getChangeTime())); msgResult.setOpContent(msg.getOpContent()); msgResult.setOpType(msg.getOpType().name()); msgResult.setReason(msg.getReason()); try { log.info("发送名单变更 NSQ 消息 :{}", msgResult); txNsqProducer.send(NAME_CHANGED_NSQ_TOPIC, msgResult); } catch (NSQException e) { log.error("发送名单变更 NSQ 消息失败,失败原因{}", e.getMessage()); throw new RiskBizException(CommonErrorCode.INTEGRATION_ERROR); } }
|