文章目录
- 1. 前言
- 2. startScheduledTask 启动定时任务
- 2.1 fetchNameServerAddr 拉取名称服务地址
- 2.2 updateTopicRouteInfoFromNameServer 更新 topic 路由信息
- 2.2.1 topic 路由信息
- 2.2.2 updateTopicRouteInfoFromNameServer 获取 topic
- 2.2.3 updateTopicRouteInfoFromNameServer 获取路由信息并更新本地缓存
- 2.2.4 getTopicRouteInfoFromNameServer 从 NameServer 中获取指定的路由信息
- 2.2.5 NameServer 处理 GET_ROUTEINFO_BY_TOPIC 请求
- 2.2.6 topicRouteDataIsChange 判断 topic 路由信息是否发生了变化
- 2.2.7 isNeedUpdateTopicRouteInfo 是否需要更新 topic 路由配置
- 2.2.7.1 DefaultMQProducerImpl#isPublishTopicNeedUpdate 是否需要更新 topic 信息
- 2.2.7.2 DefaultMQPushConsumerImpl#isSubscribeTopicNeedUpdate 是否需要更新 topic 订阅信息
- 2.2.8 DefaultMQProducerImpl#updateTopicPublishInfo
- 2.2.9 DefaultMQPushConsumerImpl#updateTopicSubscribeInfo
- 2.2.10 topicRouteData2TopicPublishInfo 将 TopicRouteData 转成 TopicPublishInfo
- 3. cleanOfflineBroker 清除下线的 broker
- 4. persistAllConsumerOffset 持久化消费进度
- 4.1 LocalFileOffsetStore#persistAll
- 4.2 RemoteBrokerOffsetStore#persistAll
- 4.3 broker 处理 UPDATE_CONSUMER_OFFSET 请求
- 5. adjustThreadPool 调整消费者线程数
- 6. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)
- 【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)
- 【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3)
上面几篇文章我们介绍了生产的启动流程,这篇文章就来看下生产者启动的时候在 MQClientInstance 里面通过 startScheduledTask
启动的定时任务,在上面的文章中也介绍过这个类,MQClientInstance 是 RocketMQ 客户端的一个核心实例,负责管理消息的生产和消费,封装了与消息队列服务端的通信逻辑,包括消息的发送、接收以及相关配置的管理。所以可以说这些定时任务在生产者和消费者启动的时候都会启动。
2. startScheduledTask 启动定时任务
MQClientInstance 在启动的时候会通过 startScheduledTask
方法启动一批定时任务,一共有 5 个:
- 每隔 2min 定时拉取 namesrv 地址
- 每隔 30s 定时拉取最新的 topic 路由信息
- 每隔 30s 定时清除无效的 broker,同时向所有 broker 发送心跳
- 每隔 5s 尝试持久化消费者偏移量,就是消费进度
- 每隔 1min 调整一次消费者的线程数
2.1 fetchNameServerAddr 拉取名称服务地址
// 1.定时任务,默认一开始延时 10s 执行,之后每 2min 执行一次
if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 定时从 nameserver 服务器去拉取名称服务地址// 这里获取 ws 地址,默认是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr// 全部构成应该是 http:// + wsDomainName + :8080/rocketmq/ + wsDomainSubgroup// wsDomainName 通过 rocketmq.namesrv.domain 获取,默认值是 jmenv.tbsite.net// wsDomainSubgroup 通过 rocketmq.namesrv.domain.subgroup 获取,默认值是 nsaddr// 当然了如果 wsDomainName 设置成 ip:port 的格式,那么最终的地址就是 http:// + ip:port + /rocketmq/ + wsDomainSubgroup// 比如 wsDomainName 设置成了 127.0.0.1:9876,那么结果就是 http://127.0.0.1:9876/rocketmq/nsaddrMQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
这个定时任务就是定时从 NameServer 服务器拉取 NameServer 服务地址,默认是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr
,不过一般创建出生产者或者消费者之后都会通过 set 方法手动设置 NameServer 地址,所以也不需要用到这个定时任务。
/*** 拉取 nameserver 的地址* @return*/
public String fetchNameServerAddr() {try {// 这里获取 ws 地址,默认是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr// 全部构成应该是 http:// + wsDomainName + :8080/rocketmq/ + wsDomainSubgroup// wsDomainName 通过 rocketmq.namesrv.domain 获取,默认值是 jmenv.tbsite.net// wsDomainSubgroup 通过 rocketmq.namesrv.domain.subgroup 获取,默认值是 nsaddr// 当然了如果 wsDomainName 设置成 ip:port 的格式,那么最终的地址就是 http:// + ip:port + /rocketmq/ + wsDomainSubgroup// 比如 wsDomainName 设置成了 127.0.0.1:9876,那么结果就是 http://127.0.0.1:9876/rocketmq/nsaddrString addrs = this.topAddressing.fetchNSAddr();if (addrs != null) {// 地址发生了变化,更新 nameSrv 地址if (!addrs.equals(this.nameSrvAddr)) {log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);// 更新 namesrvAddrList 列表,就是更新里面的 namesrv 地址this.updateNameServerAddressList(addrs);// 更新 nameSrvAddrthis.nameSrvAddr = addrs;return nameSrvAddr;}}} catch (Exception e) {log.error("fetchNameServerAddr Exception", e);}return nameSrvAddr;
}
可以看到这里面拉取到 NameServer 地址之后设置到 nameSrvAddr 属性中,在设置之前会通过 updateNameServerAddressList
更新 NettyRemotingClient 的 namesrvAddrList 地址。为什么要更新这个呢?不知道大家还记得 RocketMQ 的通信架构吗。
生产者和消费者都只设置 NameServer 地址,但是最终消息发送和消费都是操作 broker 的,所以肯定是通过某个方法将 broker 地址存储到了本地,那么 broker 地址从哪里获取呢?当然就是从 NameServer 了,broker 启动的时候会将自身信息上报给 NameServer,所以 NameServer 里面是存储了 broker 信息的,因此直到了 NameServer 地址,生产者和消费者只需要定时向 NameServer 发起请求查询,之后将 broker 信息存储到本地即可,而这里的定时查询存储就是下面 2.2 小结的定时任务要做的,这里还是先回到 updateNameServerAddressList
方法。
/*** 更新 namesrv 地址,addrs 是传入的最新的地址* @param addrs*/
public void updateNameServerAddressList(final String addrs) {// 可以传入 namesrv 集群地址,通过 ';' 分割就行了String[] addrArray = addrs.split(";");List<String> list = Arrays.asList(addrArray);// 更新 namesrvAddrList 列表this.remotingClient.updateNameServerAddressList(list);
}
这个方法会将获取到的 addrs 通过 ;
分割,然后通过 updateNameServerAddressList
更新 namesrvAddrList 列表。
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();/*** 更新 nameserver 地址* @param addrs 传入的新地址*/
@Override
public void updateNameServerAddressList(List<String> addrs) {// 获取旧的 namesrv 列表List<String> old = this.namesrvAddrList.get();boolean update = false;// 如果新的不是空if (!addrs.isEmpty()) {if (null == old) {// 原来还没有数组,说明第一次更新update = true;} else if (addrs.size() != old.size()) {// 如果大小不一样那也是需要更新update = true;} else {// 如果其中有一条不一样,那也是需要更新for (int i = 0; i < addrs.size() && !update; i++) {if (!old.contains(addrs.get(i))) {update = true;}}}if (update) {// 需要更新旧打乱地址Collections.shuffle(addrs);log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);// 重新设置进去this.namesrvAddrList.set(addrs);if (!addrs.contains(this.namesrvAddrChoosed.get())) {// 再更新 namesrvAddrChoosed,namesrvAddrChoosed 就是当前被选择的 namesrvthis.namesrvAddrChoosed.set(null);}}}
}
这个 namesrvAddrList 也是一个原子类,而且可以看到这个方法里面的更新是属于全量更新,也就是一次性更新所有 NameServer 地址,因此更新前需要有一个 update 字段来标记 NameServer 有没有发生变化,出现下面的几种情况:namesrvAddrList 是第一次设置
,NameServer 地址集合长度不一样
、长度一样但是地址有不同的
,这几种情况就属于发生了变化,需要重新设置 namesrvAddrList。
而最后还需要更新一个属性 namesrvAddrChoosed
,namesrvAddrChoosed 就是当前被选择的 NameServer 地址,broker 注册的时候会向所有 NameServer 注册信息,所以生产者消费者只需要和一个 broker 建立连接就行,因此 namesrvAddrChoosed 指向的就是建立连接的 broker 地址。
2.2 updateTopicRouteInfoFromNameServer 更新 topic 路由信息
// 2.初始延时 10ms,之后每隔 30s 执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 定时从 nameserver 中拉取 topic 路由信息来更新MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
第二个定时任务就是每隔 30s 定时去获取 topic 路由信息,然后更新本地缓存,在更新本地缓存前,我们还是来看下 topic 路由信息里面包括什么。
2.2.1 topic 路由信息
topic 路由信息封装到了 TopicRouteData
对象中,下面就是这个对象里面的属性:
// 顺序 topic 的配置信息,来自 ${user.home}\namesrv\kvConfig.json,存储到 KVConfigManager#configTable(namespace, (key, value))
// 这里赋值的时候通过 configTable.get(ORDER_TOPIC_CONFIG).get(topic) 来获取这个 topic 下面的配置,比如 "broker-a:4;broker-b:4",
// 意思是在 broker-a 下面有 4 个队列,在 broker-b 下面有 4 个队列,一般都是 null
private String orderTopicConf;
// topic 下面的队列信息(数量、分布 ...)
private List<QueueData> queueDatas;
// topic 存储的 broker 信息,一个 topic 是可以存储到多个 broker 的
private List<BrokerData> brokerDatas;
// broker 地址对应的过滤器地址列表
// 在 4.3 版本以前的过滤器是会在 broker 的服务器上运行一个 filterServer 的进程,并在 broker 的配置中加上:filterServerNums=1
// 比如: String filterCode = MixAll.file2String("过滤类的绝对路径");
// System.out.println(filterCode);
// consumer.subscribe("TopicFilter","xjf.filter.MessageFilterImpl(类的相对路径)",filterCode);
// 所以用户可以自定义一些过滤信息上传到过滤服务器上面
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- orderTopicConf: 顺序 topic 的配置信息,来自
${user.home}\namesrv\kvConfig.json
,存储到KVConfigManager#configTable(namespace, (key, value))
,这里赋值的时候通过configTable.get(ORDER_TOPIC_CONFIG).get(topic)
来获取这个 topic 下面的配置,比如"broker-a:4;broker-b:4"
,意思是在 broker-a 下面有 4 个队列,在 broker-b 下面有 4 个队列,一般都是 null。 - queueDatas: topic 下面的队列信息(数量、分布 …)。
- brokerDatas: topic 存储的 broker 信息,一个 topic 是可以存储到多个 broker 的。
- filterServerTable: broker 地址对应的过滤服务器地址列表,在 4.3 版本以前的过滤器是会在 broker 的服务器上运行一个 filterServer 的进程,并在 broker 的配置中加上:filterServerNums=1,用户可以自己订阅过滤逻辑,将代码传到过滤服务器上去过滤,但是现阶段版本已经不用了,使用 SQL92 代替了过滤服务器。
QueueData 是队列信息,一个 topic 可以有很多个读写队列,这些队列可以分不到不同的 broker 上,所以 QueueData 包括了下面的属性:
- brokerName: 分布的 broker 名称(broker 集群)。
- readQueueNums: 读队列的数量,即该队列下有多少个读队列,这个值在创建队列时由用户指定,通常是 4。
- writeQueueNums: 写队列的数量,即该队列下有多少个写队列,这个值在创建队列时由用户指定,通常是 4。
- perm: 权限,PermName 里面就配置了相关的权限,比如 6 是读写权限,4 是只读,也就是只能消费,2 是只写,也就是只能发送消息,1 表示继承 topic 的配置。
- topicSysFlag: topic 的同步配置,具体的配置值可以到 TopicSysFlag 类中去看,默认是 0,但是这个参数现在我也没搞懂有什么用。
BrokerData 是 broker 信息,当然了 topic 路由配置里面的 broker 信息比较简单,包括了 broker 集群名称以及 broker 节点,下面是里面的属性:
- cluster: broker 所属集群,比如最经典的 DefaultCluster。
- brokerName: broker 名称,又或者说是 broker 集群中的一个主从集群的名称。
- brokerAddrsbrokerAddrs: broker 地址,brokerName 这个主从集群下面的 broker 节点地址,Map 类型,key 是 brokerId,value 是 broker 地址,0 是主节点,其他是从节点。
2.2.2 updateTopicRouteInfoFromNameServer 获取 topic
从方法名也可以看出,这个方法就是从 NameServer 中获取路由信息,然后更新本地缓存,在获取路由信息之前我们要知道有哪些 topic,所以这个方法的核心就是从 consumerTable
和 producerTable
两个集合中获取进程消费者和生产者订阅的 topic,最后再调用 updateTopicRouteInfoFromNameServer
去获取路由信息。
/*** 从 namesrv 中拉取 topic 路由消息并更新*/
public void updateTopicRouteInfoFromNameServer() {// 去除重复数据Set<String> topicList = new HashSet<String>();// 遍历当前 MQClientInstance 下面的所有消费者来获取对应的订阅信息,主要就是里面的 topic{Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 获取每个消费者下面的订阅列表Set<SubscriptionData> subList = impl.subscriptions();if (subList != null) {// 遍历所有订阅信息,将这个 Consumer 订阅的 topic 添加到 topicList 中for (SubscriptionData subData : subList) {topicList.add(subData.getTopic());}}}}}// 遍历当前 MQClientInstance 下面的所有生产者来获取对应的订阅信息,主要就是里面的 topic{Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 获取生产者内部的 topicPublishInfoTable 列表的 topicSet<String> lst = impl.getPublishTopicList();// 将 topic 添加到集合中topicList.addAll(lst);}}}// 上面是从生产者和消费者中获取的, 但是生产者和消费者都有自己的 MQClientInstance, 所以 topicList 只会包括生产者的或者是消费者的for (String topic : topicList) {// 遍历每一个 topic,从 nameserver 中获取这个 topic 的路由信息并更新this.updateTopicRouteInfoFromNameServer(topic);}
}
2.2.3 updateTopicRouteInfoFromNameServer 获取路由信息并更新本地缓存
/*** 从 nameserver 中获取 topic 的路由信息并尝试更新* @param topic* @return*/
public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
}
这个方法就是上面 2.2.2 小节最后调用的方法,可以看到里面也是调用了另外一个 updateTopicRouteInfoFromNameServer 方法,传入三个参数,分别是:topic、是否是默认的 topic、默认生产者,默认 topic 就是 "TBW102"
,当然我们这里肯定不是了。
/*** 从 nameserver 中获取 topic 的路由信息并尝试更新* @param topic* @return*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {// 加锁防止并发,加锁时间是 3sif (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// topic 路由信息TopicRouteData topicRouteData;// 1.如果通过默认 topic 去获取路由信息if (isDefault && defaultMQProducer != null) {// 如果是默认 topic,就是获取 TBW102 的路由信息topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {// 更新队列信息,设置读写队列数量for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {// 2. 定时获取路由信息的逻辑会走这里,会从 nameserver 获取指定的 topic 的路由信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());}// 3.获取到的路由信息不为空if (topicRouteData != null) {// 那么从路由列表里面获取本地缓存的旧的 topic 路由信息TopicRouteData old = this.topicRouteTable.get(topic);// 判断路由信息是否发生了修改boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {// 如果没有发生修改,判断下这个 topic 路由信息是否需要更新了,如果说这个 topic 已经不可用了,那么别管一// 不一样,都是需要更新的changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}// 4.如果需要更新本地路由信息if (changed) {// 将 nameserver 的路由信息克隆一份TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();// 遍历这个 topic 分布的所有 broker,因为一个 topic 可以分布到不同 broker 下面for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 4.1 首先是更新 broker 信息this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// 4.2 然后是更新生产者的 topicPublishInfoTable 信息,就是生产者的路由信息if (!producerTable.isEmpty()) {// 将 TopicRouteData 转换成 TopicPublishInfoTopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);// 因为将 topicRouteData 设置到了 publishInfo 里面,所以这个属性也得设置为 true,表示包括路由信息publishInfo.setHaveTopicRouterInfo(true);// 遍历所有生产者Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {// group(生产者组) -> MQProducerInner(生产者实例)Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 更新生产者的 topicPublishInfoTable 集合impl.updateTopicPublishInfo(topic, publishInfo);}}}// 4.3 然后是更新消费者的 topicSubscribeInfoTable 信息,就是消费者的订阅路由信息if (!consumerTable.isEmpty()) {// 消费者订阅的 topic 集合Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {// 遍历这个 MQClientInstance 下面的所有消费者Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 更新消费者的订阅信息impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);// 4.4 最后更新本地的 topic 信息this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);}} catch (MQClientException e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} catch (RemotingException e) {log.error("updateTopicRouteInfoFromNameServer Exception", e);throw new IllegalStateException(e);} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;
}
上面就是更新本地缓存的逻辑,在更新之前需要先加个锁,防止并发更新,比如一个进程里面的生产者和消费者同时跑定时任务来更新。
然后就是获取路由信息,如果说传入了默认 topic,就从 NameServer 里面拉取 TBW102 的路由信息,然后再从传入的默认生产者里面获取读写队列数设置到路由信息里面。定时任务调用的就没有传入这个默认 topic,因此定时任务会直接从 NameServer 里面拉取传入的 topic 的路由信息。
获取到路由信息之后,首先就是先从本地 topicRouteTable
缓存,这个缓存就是一个本地的 topic 路由缓存。
/**
* 本地路由信息缓存
*/
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
接着通过 topicRouteDataIsChange
判断路由信息是否发生了变化,如果发生了变化,就可以修改,如果没有发生变化,再判断下路由信息是否需要更新,也就是 isNeedUpdateTopicRouteInfo
,这个方法后面会讲解。
那如果需要更新,首先就是更新 brokerAddrTable
集合,上面 2.2.1 小节我们也说了路由信息里面是有 brokerDatas
属性的,所以这里更新 brokerAddrTable 的逻辑就是直接往里面设置 broker 信息,这个 brokerAddrTable 是一个 Map 集合。
// brokerName -> (id, address),一个集群里面的 broker 的 brokerName 是一样的
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =new ConcurrentHashMap<String, HashMap<Long, String>>();
从这里的定义也可以看出,broker 主从集群共用一个 brokerName,依靠 brokerId 区分是主节点还是从节点。
回到源码,继续往下,接着就是更新 producerTable
的 topicPublishInfoTable
集合,这个集合代表生产者的订阅 topic 的路由信息,以及更新 consuemerTable
的 topicSubscribeInfoTable
集合,这个集合代表消费者的订阅信息,最后更新本地的 topicRouteTable
集合。
好了,这里就是更新 topic 路由信息的全部逻辑,可以看到的是主要更新了下面四个集合:
- topicRouteTable: topic 路由集合
- brokerAddrTable: broker 地址集合
- DefaultMQProducerImpl#topicPublishInfoTable: 生产者的 topic 路由信息
- DefaultMQPushConsumerImpl#RebalanceImpl#topicSubscribeInfoTable: 消费者下面的 topic 订阅信息
所以这里也就回收了 2.1 小节的问题,就是 broker 地址是在哪里更新的。
2.2.4 getTopicRouteInfoFromNameServer 从 NameServer 中获取指定的路由信息
/*** 从 NameServer 中拉取 topic 的路由信息* @param topic* @param timeoutMillis* @param allowTopicNotExist* @return* @throws MQClientException* @throws InterruptedException* @throws RemotingTimeoutException* @throws RemotingSendRequestException* @throws RemotingConnectException*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {// 构建获取路由信息的请求头GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();// 要获取哪个 topic 的路由信息requestHeader.setTopic(topic);// 获取请求命令对象,构建出来的请求 CODE 是 GET_ROUTEINFO_BY_TOPICRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);// 同步远程调用,这里传入的 addr 为空,意思是会随机选择一个 NameServer 来发送请求RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {// 找不到这个 topic 的路由信息if (allowTopicNotExist) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {// 成功获取,对返回消息体解码byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());
}
可以看到这里就是发送了一个同步请求,请求 Code 为 GET_ROUTEINFO_BY_TOPIC
,请求参数是 topic。
2.2.5 NameServer 处理 GET_ROUTEINFO_BY_TOPIC 请求
NameServer 会通过 getRouteInfoByTopic 方法来处理生产者和消费者的获取路由配置请求。
/*** 处理根据 topic 查询路由配置的请求* @param ctx* @param request* @return* @throws RemotingCommandException*/
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 创建响应结果final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 获取请求参数final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 根据 topic 获取对应的路由配置TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {// 顺序消息相关的if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}// 设置返回结果byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;
}
可以看到上面是通过 pickupTopicRouteData
来获取对应 topic 的路由配置的,同时获取到配置之后如果 topic 允许顺序消息,那么设置下顺序 topic 的配置,这个后面再研究吧,现在来看下 pickupTopicRouteData
里面的逻辑,不过在进入这个方法之前还是先回顾下 TopicRouteData 里面有什么属性。
上面设置了 orderTopicConf,下面这个 pickupTopicRouteData
就是要处理剩下的三个属性。
/*** 获取 topic 的路由配置* @param topic* @return*/
public TopicRouteData pickupTopicRouteData(final String topic) {// 初始化路由配置TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;// 最重要的三个属性: queueDatas(队列信息)、brokerDatas(broker 信息)、filterServerTable(过滤服务器信息)Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {// 加读锁this.lock.readLock().lockInterruptibly();// 从 topicQueueTable 中获取这个 topic 下面的队列信息List<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {// 设置队列信息topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;Iterator<QueueData> it = queueDataList.iterator();// 遍历所有的队列信息,获取这个 topic 的队列会存储到哪些 broker,简单说就是获取这个 topic 存储到哪些 brokerwhile (it.hasNext()) {QueueData qd = it.next();brokerNameSet.add(qd.getBrokerName());}// 遍历这些 brokerfor (String brokerName : brokerNameSet) {// 从 brokerAddrTable 中获取对应的 broker 信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);// 这里就是获取到 broker 信息了if (null != brokerData) {// 相当于克隆一份数据到 brokerDataClone 中BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());// 添加到 brokerDataList 集合brokerDataList.add(brokerDataClone);foundBrokerData = true;// 获取过滤服务器的地址for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {// 解锁this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {// 如果两个都找到了,就返回结果return topicRouteData;}// 如果 broker 信息或者队列信息没找到,就返回空return null;
}
上面就是 pickupTopicRouteData
方法的代码,这里面注释也写的比较清除,所以就简单说下这几个属性的来源:
- queueDatas:从
topicQueueTable
获取,这个集合存储了 topic -> 队列信息,因为一个 topic 可以存储到多个 broker 下面,所以是Map<String, List< QueueData>>
类型。 - brokerDatas:从
brokerAddrTable
获取,这个集合之前也说过了,就是存储了 brokerName -> broker 地址信息,是Map<String, BrokerData>
类型集合,BrokerData 上面 2.2.1 小节也有介绍里面的一些属性,这里不多说。还要注意的一点是代码中是获取了 queueDatas 之后,遍历这些队列,然后获取这些队列分布的 broker,再去获取 brokerDatas 信息,所以是一个前后的关系。 - filterServerTable:从
filterServerTable
中获取,这个集合存储了 broker 地址 -> 这个 broker 的过滤服务器地址的映射关系,因为涉及到 broker 地址,所以获取到 brokerDatas 之后再遍历这个 broker 集群下面的所有 broker 地址来获取过滤服务器信息。
2.2.6 topicRouteDataIsChange 判断 topic 路由信息是否发生了变化
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}
判断 TopicRouteData 是否发生了变化就是判断里面的几个属性是否不同,如果不同就说明有变化,直接用 equals 方法判断就行了。
2.2.7 isNeedUpdateTopicRouteInfo 是否需要更新 topic 路由配置
是否需要更新就得看生产者和消费者里面对应的订阅 topic 集合信息是否需要更新。
private boolean isNeedUpdateTopicRouteInfo(final String topic) {boolean result = false;{Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {result = impl.isPublishTopicNeedUpdate(topic);}}}{Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext() && !result) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {result = impl.isSubscribeTopicNeedUpdate(topic);}}}return result;
}
从上面可以看到,生产者是遍历了 producerTable
,每一个生产者都去使用 isPublishTopicNeedUpdate 方法判断是否需要更新 topic 信息。而消费者则是遍历 consumerTable
集合,一个一个消费者去判断订阅的这个 topic 下面的订阅信息是否需要更新。
2.2.7.1 DefaultMQProducerImpl#isPublishTopicNeedUpdate 是否需要更新 topic 信息
@Override
public boolean isPublishTopicNeedUpdate(String topic) {TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);return null == prev || !prev.ok();
}/*** 当前 topic 是否是可以使用的,如果说 topic 下面的队列不为空,那么就说明还可以使用* @return*/
public boolean ok() {return null != this.messageQueueList && !this.messageQueueList.isEmpty();
}
对于生产者来说 topic 配置信息不可用当然就是没办法向 topic 发送消息,而我们之前也说过如果需要发送消息,topic 下面的消息队列肯定是需要可用,所以这里的判断就是如果这个 topic 下面的消息队列为空,就是没办法发送消息,这种情况下就是不可用,需要更新 topic 信息。
2.2.7.2 DefaultMQPushConsumerImpl#isSubscribeTopicNeedUpdate 是否需要更新 topic 订阅信息
对于消费者来说,消费者组里面的消费者会通过负载均衡分配到 topic 下面的某个队列,所以如果这个消费者的负载均衡服务下面的 topicSubscribeInfoTable 集合没有找到这个 topic 对应的队列信息就说明需要更新。
/*** topic -> 消费队列*/
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =new ConcurrentHashMap<String, Set<MessageQueue>>();
不过大家要注意 MQClientInstance 是生产者和消费者独有的,也就是说消费者找不到这个 topic 的订阅信息,就说明需要更新,虽然 2.2.2 小节 updateTopicRouteInfoFromNameServer 是从生产者和消费者里面都获取了 topic 然后遍历所有 topic 去请求路由信息,但是实际上一个进程的生产者只会请求生产者的 topic 路由信息,消费者只会请求消费者的,不会串起来。因为 MQClientInstance 是生产者和消费者的私有属性,会通过 clientId 去标识,在前面的文章我们也说过了当前版本生产 clientId 的时候是加了时间戳的,所以一般一个进程里面的生产者消费者启动都会创建自己的 MQClientInstance,而这些定时任务又是 MQClientInstance 下面的。
回到 isSubscribeTopicNeedUpdate,既然有了上面的概念,那么下面就直接给出源码就行,直接判断是否在负载均衡服务的 topic 订阅信息集合中即可。
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {if (subTable.containsKey(topic)) {// 判断是否在负载均衡服务的 topic 订阅信息集合中return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);}}return false;
}
2.2.8 DefaultMQProducerImpl#updateTopicPublishInfo
当获取到 topic 路由配置之后,转成 TopicPublishInfo,然后通过 DefaultMQProducerImpl#updateTopicPublishInfo
设置到生产者的 topicPublishInfoTable
集合中。
/*** 更新生产者的 topic 路由信息* @param topic* @param info*/
@Override
public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {if (info != null && topic != null) {// 更新 topic 下面的路由信息TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);if (prev != null) {// 这里就是将之前的覆盖了log.info("updateTopicPublishInfo prev is not null, " + prev.toString());}}
}
2.2.9 DefaultMQPushConsumerImpl#updateTopicSubscribeInfo
消费者也差不多,跟上面 2.2.7.2 是匹配的,也是更新 topicSubscribeInfoTable 就好了。
@Override
public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {// 获取消费者订阅的 topicMap<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {// 如果订阅的 topic 包括传入的 topic,才更新下面的队列信息if (subTable.containsKey(topic)) {this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);}}
}
2.2.10 topicRouteData2TopicPublishInfo 将 TopicRouteData 转成 TopicPublishInfo
最后来看下 topicRouteData2TopicPublishInfo 方法,就是将 TopicRouteData 转成生产者所需的 TopicPublishInfo,注释写的比较详细了,直接看代码。
/*** 将 TopicRouteData 转化为 TopicPublishInfo,TopicPublishInfo 就是本地路由信息存储类* @param topic* @param route* @return*/
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {// TopicPublishInfo 是本地存储的 topic 路由信息TopicPublishInfo info = new TopicPublishInfo();// 将路由信息设置到 TopicPublishInfo 里面info.setTopicRouteData(route);// 如果说 orderTopicConf 不为空if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 按照分号进行分割,这个属性会标记 topic 在这个 broker 下面有多少个 MessageQueueString[] brokers = route.getOrderTopicConf().split(";");// 比如 broker-a:4;broker-b:4for (String broker : brokers) {String[] item = broker.split(":");// MessageQueue 数量int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {// 生成 MessageQueue,指定 MessaegQueue 的 topic、所属的 broker、编号MessageQueue mq = new MessageQueue(topic, item[0], i);// 添加到 TopicPublishInfo 里面info.getMessageQueueList().add(mq);}}// 如果设置了 orderTopicConf 这个配置,那么说明这个 topic 是一个顺序 topicinfo.setOrderTopic(true);} else {// 这里就不是顺序消息了,获取这个 topic 下面的队列消息List<QueueData> qds = route.getQueueDatas();// 按照 brokerName 排序Collections.sort(qds);for (QueueData qd : qds) {// 遍历每一个队列,如果说这个队列是可写的,也就是可以接收消息的if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData = null;// 遍历这个 topic 存储的所有 broker 信息,一个 topic 是可以存储到多个 broker 的for (BrokerData bd : route.getBrokerDatas()) {if (bd.getBrokerName().equals(qd.getBrokerName())) {// 获取下这个 QueueData 所属的 broker 信息brokerData = bd;break;}}// 没找到,继续处理下一个 QueueDataif (null == brokerData) {continue;}// 如果这个队列所属的 broker 集群里面没有主节点if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {// 继续处理下一个队列的信息,因为消息发送和消费都是以主节点为主的continue;}// 这里就是根据 writeQueueNums 属性来生成 MessageQueuefor (int i = 0; i < qd.getWriteQueueNums(); i++) {// 生成 MessageQueue,指定 MessaegQueue 的 topic、所属的 broker、编号MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);// 添加到 TopicPublishInfo 中info.getMessageQueueList().add(mq);}}}// 设置顺序 topic 属性为 falseinfo.setOrderTopic(false);}return info;
}
还是要注意下,非顺序 topic 前提下,遍历 topic 下面的队列,因为队列可以分配到不同的 brokerName 集群,所以需要遍历所有的队列,然后判断这些队列分配到的 brokerName 集群中有没有主节点,怎么判断呢?topic 路由配置里面的 brokerDatas 就存储了这个 topic 分配的 broker 信息,所以遍历 QueueData,然后从 brokerDatas 中找到 QueueData 里面的 brokerName 集群,如果没有主节点,就不处理,因为生产者写消息是写给主节点的。
如果有主节点,就 根据 QueueData#writeQueueNums 属性来生成 MessageQueue,指定这个队列的 topic、brokerName、下标,接着添加到 TopicPublishInfo#messageQueueList 属性中,生产者生产消息的时候可以直接调用 send 方法指定里面的队列去发送消息。
3. cleanOfflineBroker 清除下线的 broker
// 3.初始延时 1s,之后每隔 30s 执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 清除下线的 brokerMQClientInstance.this.cleanOfflineBroker();// 给所有 broker 发送心跳MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
这个定时任务会初始延时 1s,之后每隔 30s 执行一次,主要做两件事:
- 清除下线的 broker
- 给所有 broker 发送心跳
这个 sendHeartbeatToAllBrokerWithLock 在文章 【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3) 有说过,所以这里就不说 sendHeartbeatToAllBrokerWithLock 了,来看下第一个 cleanOfflineBroker 是如何清除下线的 broker 的。
/*** 定时清除下线的 broker*/
private void cleanOfflineBroker() {try {// 加锁if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))try {ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();while (itBrokerTable.hasNext()) {// 下面会遍历所有 brokerName 下面的所有主从节点Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();// 克隆一份,避免下面边处理,上面被修改了HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();cloneAddrTable.putAll(oneTable);Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> ee = it.next();// 遍历所有的 broker 地址String addr = ee.getValue();// 判断这个 broker 地址是否还存在 topicRouteTable 里面任意一个 topic 的路由信息中// 这个 topicRouteTable 在 startScheduledTask 方法中会每隔 30s 拉取一次 topic 路由信息来更新// 所以可以直接通过这个集合去判断 brokerAddrTable 里面的地址有没有过期if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {// 不存在就删掉it.remove();log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);}}// 如果为空,那么删掉这个 brokerName 下面的所有地址if (cloneAddrTable.isEmpty()) {itBrokerTable.remove();log.info("the broker[{}] name's host is offline, remove it", brokerName);} else {// 否则就将更新之后的集合重新设置进去updatedTable.put(brokerName, cloneAddrTable);}}// 更新完成了,如果不为空,更新 brokerAddrTableif (!updatedTable.isEmpty()) {this.brokerAddrTable.putAll(updatedTable);}} finally {this.lockNamesrv.unlock();}} catch (InterruptedException e) {log.warn("cleanOfflineBroker Exception", e);}
}
逻辑就是遍历 brokerAddrTable
里面的所有 broker 地址,这个集合在第 2 节定时任务就维护了,所以直接遍历就行,接着使用 isBrokerAddrExistInTopicRouteTable
方法判断这个 broker 是否存在于 topicRouteTable 集合中,topicRouteTable 就是 topic 路由集合,如果说 TopicRouteData#brokerDatas 里面不包括当前 broker 地址,那么就说明这个地址失效了,直接删掉。
/*** 判断 addr 是否在 topic 路由配置中* @param addr* @return*/
private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {// 遍历 topic 路由集合Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();while (it.hasNext()) {Entry<String, TopicRouteData> entry = it.next();TopicRouteData topicRouteData = entry.getValue();// 获取 topic 分布的 broker 信息List<BrokerData> bds = topicRouteData.getBrokerDatas();for (BrokerData bd : bds) {// 遍历所有路由信息, 然后一个一个判断 broker 地址是否在 topic 路由配置中if (bd.getBrokerAddrs() != null) {boolean exist = bd.getBrokerAddrs().containsValue(addr);if (exist)return true;}}}return false;
}
4. persistAllConsumerOffset 持久化消费进度
// 4.初始化延迟 10s 执行,每隔 5s 执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 持久化消费者偏移量,就是消费进度// - 如果是广播模式持久化到本地// - 如果是集群默认就推送到 brokerMQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
消费者消息拉取的时候会根据偏移量拉取,所以这里会定时持久化消费者偏移量到本地或者 broker。
/*** 持久化 consumerTable 消费者偏移量集合*/
private void persistAllConsumerOffset() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {// 遍历所有消费者,广播模式下持久化到本地,集群模式下推送到 broker 端Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();// 依次调用每一个 MQConsumerInner#persistConsumerOffset 方法去持久化impl.persistConsumerOffset();}
}
这里就是遍历所有消费者,然后依次调用每一个 MQConsumerInner#persistConsumerOffset 方法去持久化。
/*** 持久化偏移量*/
@Override
public void persistConsumerOffset() {try {// 首先确定当前 Consumer 服务状态是正常的this.makeSureStateOK();// 获取所有的队列Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);// 持久化所有队列的偏移量到本地或者远程 brokerthis.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}
}
4.1 LocalFileOffsetStore#persistAll
/*** 持久化队列的消费偏移量到本地* @param mqs*/
@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;// 创建一个偏移量的序列化包装类,里面就是封装了 offsetTableOffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {// 获取偏移量AtomicLong offset = entry.getValue();// 存到 offsetSerializeWrapper 中offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}// 通过 JSON.toJSON 转成 JSON 字符串,同时将消费者偏移量持久化到本地String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {// 存储路径默认是: ${user.home}/.rocketmq_offsets/#{clientId}/#{groupName}/offsets.jsonMixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}
}
持久化到本地的逻辑就是将消息队列的偏移量存储到本地 ${user.home}/.rocketmq_offsets/#{clientId}/#{groupName}/offsets.json
文件中。
4.2 RemoteBrokerOffsetStore#persistAll
/*** 持久化所有的 MQ 的消费偏移量到远程 broker* @param mqs 当前消费者需要消费的所有队列*/
@Override
public void persistAll(Set<MessageQueue> mqs) {// 如果队列为空,直接返回if (null == mqs || mqs.isEmpty())return;// 未上报的 MQ 集合final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {// 遍历 offsetTable,如果队列在 mqs 中,就将这个队列的偏移量信息上报偏移量到 broker// 这个偏移量会被存储到 broker 的 ConsumerOffsetManager#offsetTable 中,但是只是存到内存,但是最终会// 持久化到 ${user.home}/store/config/consumerOffset.json 文件this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} else {// 这些就是存在 offsetTable 中但是不在 mqs 中的unusedMQ.add(mq);}}}if (!unusedMQ.isEmpty()) {// 从 offsetTable 中删除那些不需要上报的 MQfor (MessageQueue mq : unusedMQ) {this.offsetTable.remove(mq);log.info("remove unused mq, {}, {}", mq, this.groupName);}}
}
远程持久化偏移量的方法会遍历 offsetTable
中的队列,如果队列在传入的 mqs
中也存在,就调用 updateConsumeOffsetToBroker
方法上报偏移量到 broker,持久化到 ${user.home}/store/config/consumerOffset.json
文件中。
传入的 mqs 是从 RebalanceImpl 重平衡服务获取的,所以如果 offsetTable 里面的队列不在 mqs 中就代表过期了,不需要这个队列了,因此后面如果 unusedMQ 集合不为空,就会将这部分队列从 offsetTable 中删掉。
下面来接着看下 updateConsumeOffsetToBroker 是如何请求 broker 存储偏移量的。
/*** Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.* 上报消费偏移量到 broker*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {updateConsumeOffsetToBroker(mq, offset, true);
}/*** Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.* 同步更新消费者偏移量,一旦主节点宕机,就将更新操作切换到从节点,此处需要进行优化*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {// 从 brokerAddrTable 中根据队列里面设置的 brokerName 获取 broker 地址FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (null == findBrokerResult) {// 找不到就从 NameServer 拉取 topic 路由消息,同时更新本地的 brokerAddrTablethis.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());// 再次从 brokerAddrTable 中根据队列里面设置的 brokerName 获取 broker 地址findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);}// 获取到了if (findBrokerResult != null) {// 构建请求头UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();// 设置队列的 topicrequestHeader.setTopic(mq.getTopic());// 设置消费者组requestHeader.setConsumerGroup(this.groupName);// 设置队列 IDrequestHeader.setQueueId(mq.getQueueId());// 设置消费偏移量requestHeader.setCommitOffset(offset);if (isOneway) {// 单向发送this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {// 同步发送this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);}} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}
}/*** 向 broker 发送单向更新消费偏移量的请求* @param addr* @param requestHeader* @param timeoutMillis* @throws RemotingConnectException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException* @throws InterruptedException*/
public void updateConsumerOffsetOneway(final String addr,final UpdateConsumerOffsetRequestHeader requestHeader,final long timeoutMillis
) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,InterruptedException {// 请求 Code 为 UPDATE_CONSUMER_OFFSETRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);// 发送单向请求,走 VIP 通道,就是异步发送,不求返回结果this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
可以看到首先就是从 brokerAddrTable 中根据队列里面设置的 brokerName 获取主节点的 broker 地址,如果找不到就从 NameServer 拉取 topic 路由消息,同时更新本地的 brokerAddrTable。
获取到 broker 地址之后,构建请求头,在请求头中设置队列 ID、消费者组、消费偏移量、队列 topic,然后使用 VIP 通道(10909 端口)单向发送,请求 code 是 UPDATE_CONSUMER_OFFSET。
4.3 broker 处理 UPDATE_CONSUMER_OFFSET 请求
broker 会通过 updateConsumerOffset 处理 Consumer 发送的更新队列的消费偏移量的请求。
/*** 更新消费者偏移量* @param ctx* @param request* @return* @throws RemotingCommandException*/
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 创建命令行响应结果final RemotingCommand response =RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);// 获取消费者请求final UpdateConsumerOffsetRequestHeader requestHeader =(UpdateConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);// 调用 consumerOffsetManager@commitOffset 来存储偏移量this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}
可以看到里面也是掉用了 commitOffset
来处理队列偏移量。
/*** 存储 Consumer 上报过来的消费队列偏移量* @param clientHost Consumer 客户端的地址* @param group Consumer 消费者组* @param topic topic 主题* @param queueId 队列 ID* @param offset 消费偏移量*/
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;// 存入偏移量this.commitOffset(clientHost, key, queueId, offset);
}/*** 提交偏移量,broker 使用 offsetTable 存储了消费者上报过来的偏移量,对应了 ${user.home}/store/config/consumerOffset.json* 这里是将偏移量存入缓存中,还没有持久化, 什么时候初始化呢? 在 broker 会有一个定时任务每隔 5s 就持久化偏移量到上面这个文件中* @param clientHost 客户端地址* @param key 缓存key* @param queueId 队列id* @param offset 消费偏移量*/
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {// 获取 topic@group 下面对应的所有队列的消费偏移量// topic@group -> (queueId, offset),一个 topic 可以被多个消费者组下面的多个消费去消费,同时每一个消费者都会分配到这个 topic// 下面的几个队列,而这个 table 不会记录队列是哪一个消费者消费的的,只会记录这个队列被消费到哪个偏移量了ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null == map) {// 如果为空,就是第一次添加map = new ConcurrentHashMap<Integer, Long>(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {// 这里就是更新偏移量逻辑Long storeOffset = map.put(queueId, offset);if (storeOffset != null && offset < storeOffset) {// 如果更新之后的偏移量比原来的还小,可能有问题,因为消费肯定是往后消费的, 这里也是打印下日志log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);}}
}
这里意思就是将生产者消费者上报的偏移量存到 broker 的 offsetTable,注意这里的偏移量是下一条将要拉取的消息在 ConsumerQueue 中的索引 index,这里更新也只是更新了 offsetTable 缓存,在 broker 会有一个定时任务每隔 5s 就持久化偏移量到 ${user.home}/store/config/consumerOffset.json
这个文件中。
{"offsetTable": {"TopicTest@consumer_group": {0: 6,1: 7,2: 9,3: 8},"SQL92Test@consumer_group": {0: 3,1: 2,2: 2,3: 3},"TopicTest@testGroupConsumer": {0: 4,1: 4,2: 6,3: 6},"%RETRY%consumer_group@consumer_group": {0: 0},"TopicMasterSlave@testGroupConsumer": {1: 0,2: 0,3: 0},"%RETRY%testGroupConsumer@testGroupConsumer": {0: 0}}
}
5. adjustThreadPool 调整消费者线程数
// 5.初始化延迟 1min 执行,每隔 1min 执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 调整消费者的线程数量MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}
}, 1, 1, TimeUnit.MINUTES);
这里是调整消费者线程数,属于动态调整,会根据还有多少消息没有拉取来调整消费者线程池数。
/*** TODO 调整消费者线程池的线程个数*/
public void adjustThreadPool() {// 计算消费者分配的队列下面还有多少消息没有消费long computeAccTotal = this.computeAccumulationTotal();// 阈值是 100000long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);if (computeAccTotal >= incThreshold) {// 如果没有消费的消息大于阈值, 增加核心线程数this.consumeMessageService.incCorePoolSize();}if (computeAccTotal < decThreshold) {// 如果没有消费的消息小于阈值, 减少核心线程数this.consumeMessageService.decCorePoolSize();}
}
但是 incCorePoolSize
和 decCorePoolSize
这两方法都是空实现,所以这个方法就不用管了。
6. 小结
好了,这篇文章我们讲解了 MQClientInstance 启动的时候启动的 5 个定时任务,最后我们简单总结下这几个定时任务,分别是:
- 每隔 2min 定时拉取 namesrv 地址。
- 每隔 30s 定时拉取最新的 topic 路由信息。
- 每隔 30s 定时清除无效的 broker,同时向所有 broker 发送心跳。
- 每隔 5s 尝试持久化消费者偏移量,就是消费进度。
- 每隔 1min 调整一次消费者的线程数。
如有错误,欢迎指出!!!