RocketMQ源码级实现原理-消息消费总览

Overview

64f6ed70f8164359b1b9b4b8ec4a635f.png

可以看到,pull message和consume message实际上是两个过程,但是对于用户是透明的 

cdf321452ce2435aa4d40b2bbf9bd691.png

注意这三个Offset的含义,physical offset就是commitLog中的全局偏移量

eed9ec9006fa4149884338453490a4ec.png

f909087704e34540b88b729dca138358.png分发dispatch

08994a3b20614d2d9e7d987fa538322f.png

如上图,Topic的每个queue,都绑定了唯一的一个pullRequest对象,每个pullRequest对象也都绑定了唯一的一个红黑树队列processQueue

每个虚线框,代表一个Java类、相同的颜色方框代表是一个线程中的调用,右上角蓝色方框就代表CallBack线程   

  • RebalanceService线程在进行rebalance时,会为每个pushConsumer(pullConsumer不归PullMessageService线程负责)负责的每个queue都分配一个专门的PullRequest,然后调用PullMessageService#executePullRequestImmediately(),把每个PullRequest都丢进PullMessageService线程的阻塞队列pullRequestQueue中去
  • PullMessageService线程自己的run()方法就负责不断从pullRequestQueue中拿PullRequest,并根据PullRequest中记录的offset从broker去拉取消息放入本地红黑树

每个消费者客户端有一个PullMessageService线程,负责多个topic的消息拉取

PullMessageService线程,在broker端有充足的消息时,

PullMessageService线程,通过一个while循环loop,来执行3个步骤:

  • 从pullResuqstQueue中take获取pullRequest
  • 获取到pullRequest后,执行一些流控动作;
  • 给broker发送一个异步的拉消息请求(并给broker传递一个回调函数)

当发送到broker端的拉消息请求拉回了消息后,会发送响应到客户端,客户端的netty IO线程会接收到响应,并把响应转给NettyClientPublicExcutor线程池中的线程,然后这个线程池中的线程会调用前面的回调函数,来把消息写入到TreeMap中

一般异步调用,典型的都会传递一个callback或者listener进去。最后会有另外一个IO线程来调起这个callback或者listener

doRebalance线程,有两个场景会将它唤醒:

  • waitForRunning 20s后自己主动醒来
  • broker端wake doRebalance线程

Pull & Push

Rebalance

topic的queue数量变化、Consumer group中的consumer数量变化时,都会引起rebalance

4c12c390105b476e84d25efd8e7a1f30.png

72626eebc2834c6ea5fff9a9faa8c9a6.png

可以看出,topic的queue越多,也就代表着有更大的并行度潜力

f60716ab943f464e94a432d9b254b9d1.png

715116ac8ddc40baa5d3114b453a40aa.png

rocketmq的rebalance:每个broker都执行上面的逻辑,对所有的队列,和所有的消费者进行排序。让每个broker都看到同样的视图

kafka的rebalance是在某一台机器上执行的,kafka需要借助zk来选择出leader

Pull Message

PullMessageService线程实现

public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();private final MQClientInstance mQClientFactory;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "PullMessageServiceScheduledThread");}});public PullMessageService(MQClientInstance mQClientFactory) {this.mQClientFactory = mQClientFactory;}public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}private void pullMessage(final PullRequest pullRequest) {// 一台消费者机器就用一个MQClientInstance表示,//一个MQClientInstance中可能存放了很多不同的consumer,这些consumer订阅着不同的consumeGroupfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {// 注意看,这里仅有DefaultMQPushConsumerImpl ,PullComsumer不在此列DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}}

一台消费者机器就用一个MQClientInstance表示

public class MQClientInstance {private final ClientConfig clientConfig;private final String clientId;private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();}

一个MQClientInstance的consumerTable中可能存放了很多不同的consumer,这些consumer订阅着不同的consumeGroup,不同的consumer就对应着有不同的DefaultMQPushConsumerImpl实例,不同的DefaultMQPushConsumerImpl实例都会通过自己的start()方法来启动很多只属于自己的服务

比如每个DefaultMQPushConsumerImpl实例都会启动自己专属的coreSize和maxSize都为20的“ConsumeMessageThread_”消费线程池

这么看起来MQClientInstance的consumerTable中是没有放PullComsumer的

PullRequest

public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue; // 本地红黑树缓存队列private long nextOffset;private boolean lockedFirst = false;
}

PullRequest是和消费者组消费者的某一个队列绑定的,如果同一台机器上,两个不同的消费者组的消费者,订阅消费着同一个topic下的同一个queue队列,那么也会有两个不同的PullRequest,会被丢进同一个PullMessageService的阻塞队列pullRequestQueue中去

b555f7d60a6a4673ae9123f546791227.png

左侧的PullMessage就是PullMessageService线程;

左侧的线程执行完channel.writeAndFlush,来把PullRequest从消费端发送到broker端该channel对应的recv_queue后,左侧的线程就可以直接返回,就可以去做下一件事了,不用还继续去等待broker返回该PullRequest对应的响应。这样做的好处就是线程非常轻量

粉红色条状,代表一个业务线程池。rocketmq的broker,定义了很多的key/value形式的pair对,key就是不同具体业务的编码,value就是不同编码对应的业务线程池,从而实现不同的业务隔离。

后续,netty的IO线程(worker线程组中的某个线程),接收到它负责的selector下的某个channel中的READ事件后,就会调ChannelInHandler#channelRead()方法,把消费端发过来的PullRequest对象对应序列化后的二进制数组数据,从该个channel在内核的recv_queue中读取出来,然后netty的IO线程就把该PullRequest丢给专门负责去CommitLog中搜取消息的业务线程池(丢完后,该IO线程就可以去干下一件事了),让这个业务线程池中的某一个线程,根据该PullRequest中的offset去ConsumeQueue->Commitlog中拉取当前批次的32条消息,成功拉取到后,是由这个业务线程池中的当前线程,执行channel.writeAndFlush()方法,把这32条消息封装为pullResult发回Consumer端

Netty Thread(NettyClientHandler)的channelRead()拿到broker端传过来的pullResult后,把它丢入NettyClientPublicExceutor_这个线程池中去,NettyClientPublicExceutor线程池中的线程,实际上也就是执行callback线程或者执行异步回调listener的线程

执行callback的NettyClientPublicExceutor线程池中的线程,也就是客户端最开始拿到消息的线程
1. NettyClientPublicExceutor线程池中的线程,对pullResult内的东西进行反序列化,得到一批msg(一般为32条)
2. 再将这些msg放入红黑树中
3. 再把已经进入红黑树中的32条消息,丢入专门负责消费的一个线程池中去

fd2cc549a1da4b90b4c915c2c9a82a3f.png

有两个时机会上报消费进度:

  • 定时器每5s会定时上报一次本地offsetTable中的消费进度
  • 客户端每次异步发送PullRequest请求到broker端时,也会携带本地的消费进度值给broker

2db13235a2064b48bb4dfb6c53363304.png

Consumer端,是有一个ConcurrentMap<MessageQueue,AtomicLong> 的本地缓存,当前消费线程消费的是哪个队列的消息,消费完这一条消息后,无论消费成功还是失败,都会把该缓存的MessageQueue对应的AtomicLong更新为红黑树的firstKey的值,也就是红黑树中的最小消息offset的值(这个值,也就是Consumer端存在本地的消费进度值)

32条消息,先被丢入红黑树队列中,然后会被分别包装成32个Runnable类型的ConsumeRequest,这32个ConsumeRequest会被丢入客户端的消费线程池,消费线程池中的线程们,会并发的消费这32条消息,消费线程池中的线程1 2 3 … 20,会回调程序员注册进来的ConsumeMessageListener的consumeMessage()方法来执行真正的消费逻辑,不管consumeMessage()方法是消费成功、失败、抛异常,消费线程1 2 3 … 20,都会把自己当前正在消费的消息从红黑树队列中删除,并同时更新consumer本地的消费进度缓存。注意,由于消费线程池并发消费消息,也就有可能并发的从红黑树中remove消息,所以红黑树的这个remove方法,需要加锁

唯一的区别是,consumeMessage()方法消费失败时,会把retryTimes➕1,并把消息重新发回broker以便第二次重试消费,而消费成功时,则不会再再把该条消息发回broker端。但是,不管消费成功,还是消费失败,该条消息对应的offset,已经被顺利度过了,以后就不会再消费该offset的消息了,以后就算是之前被发回broker的重试消费的消息再次被拉回客户端消费时,则已经不是原来的offset了,此时的消息offset,肯定早已经比原来的offset大了

520d26adc864467499ae64df8615d9f7.png

751307387bda4ae2b15343e99d6c1a4d.png

这一块pageCache一直在内存中,一段时间内,读取的都是这块热点区域的消息,所以读取的消息是有保证的,这也是为什么rocketmq高效的原因之一;大量用了PageCache 

5477173e0cb843da8f9cc87b4219deb7.png

堆外内存,是原作者在公司的性能调优的利器,排名第二的调优才是jvm gc调优

但是这个堆外内存池有一个缺点,就是jvm crash时,会丢失数据,jvm正常是不会丢消息的,也没什么毛刺非常平稳,阿里双十一几十万的tps

cba2e2ccf19d4bcb8353ad9808111539.png

消息拉取限流

本地红黑树中消息大于1000条,是拉取动作会暂停,红黑树中消息大于100m也会暂停,红黑树中最小和最大消息offset超过2000,也会限流

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

之前某位同事,在注册的监听器中,调用了某个公司的某个tcp方法,导致16个队列,就某一个队列不消费了。那么此时首先怀疑的就是这个监听器中可能有某个远程调用是阻塞住了的,最怀疑的就应该是这个监听器里面的HttpClient的远程调用,就这个调用没设置超时参数,就会一直卡住,从而一直导致一直打印:"the cached message count exceeds the threshold {}, so do flow control,之类的限流日志

客户端

package org.apache.rocketmq.client.impl.consumer;public class PullMessageService extends ServiceThread {private final InternalLogger log = ClientLogger.getLog();private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();private final MQClientInstance mQClientFactory;// 大量使用调度线程池,来做延时调度private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "PullMessageServiceScheduledThread");}});public PullMessageService(MQClientInstance mQClientFactory) {this.mQClientFactory = mQClientFactory;}public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}public void executeTaskLater(final Runnable r, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public ScheduledExecutorService getScheduledExecutorService() {return scheduledExecutorService;}private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 从阻塞队列中拿 pullRequest PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}@Overridepublic void shutdown(boolean interrupt) {super.shutdown(interrupt);ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);}@Overridepublic String getServiceName() {return PullMessageService.class.getSimpleName();}}

Topic的每个queue,都有一个唯一对应的PullRequest

public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue;private long nextOffset;private boolean lockedFirst = false;
}

nextOffset就是下一次要从messageQueue的哪个位置开始拉取消息,nextOffset在此处代表messageQueue的queue offset(0 1 2...)

public class DefaultMQPushConsumerImpl implements MQConsumerInner {public void pullMessage(final PullRequest pullRequest) {/** 给每个pullRequest都附带一个红黑树,然后每个pullRequest从broker端拿回消息后,就往自己附带的红黑树中丢;* */final ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;}if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);/** 限流:消息消费端红黑树队列中积压的消息不超过 1000 条* 当限流判断不通过时,是直接就return了,* 都不会再去走调用broker端的逻辑,从而缓解broker端的压力* * */if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn( "the cached message count exceeds the threshold {}, so do flow control, " +"minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}/** 消息处理队列中积压的消息总大小超过 100M* */if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control," +" minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (!this.consumeOrderly) {/** 如果是非顺序消费:* 消息处理队列中尽管积压没有超过 1000 条,但红黑树队列中,消息的最大偏移量与最小偏移量的差值超过 2000* */if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {if (processQueue.isLocked()) {if (!pullRequest.isLockedFirst()) {final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setLockedFirst(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}final long beginTimestamp = System.currentTimeMillis();PullCallback pullCallback = new PullCallback() {/** 这里,拿到broker端发送给客户端的pullResult*channelRead()拿到broker端传过来的pullResult后,把它丢入 NettyClientPublicExecutor 这个线程池中去,NettyClientPublicExecutor线程实际上也就是执行callback的线程; callback线程就根据成功还是失败调用下面的函数1. NettyClientPublicExecutor线程池中的线程,对pullResult内的东西进行反序列化,得到一些msg(一般为32条)2. 先将这批msg放入红黑树中,3. 再把这批消息,丢入专门负责消费的一个线程池中去NettyClientPublicExecutor*/@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {/** 1. 客户端发送了一个拉消息的请求,broker端返回FOUND代表查到了消息** 实际上,broker端发回的消息,是很多的二进制流,但是业务程序员在listener内,拿到的消息是List<Msg>这里涉及的转换从左,就是在processPullResult这里* */pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),pullResult,subscriptionData);switch (pullResult.getPullStatus()) {/** 1. 拿到broker返回回来的消息响应,先反序列化* */case FOUND:/** getNextBeginOffset,表示下一轮要拉的消息的offset,然后继续发送拉消息请求* */long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());/** Topic的每个queue,都绑定了唯一的一个pullRequest对象** 给每个pullRequest都附带一个红黑树,然后每个pullRequest拿到消息以后,就往自己附带的红黑树中丢;* processQueue里面实际放的就是一个TreeMap红黑树** 将从broker拿回来的消息(默认一次拉32条),丢入processQueue中* */boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());/** 将拉回来的32条消息,丢入消费线程池* */DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);/** 将更新过nextOffset后的新的pullRequest,重新放入到pullRequestQueue中去;* */if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);try {/** 这里把queue id,nextOffset传给broker端,* broker端的 PullMessageProcessor#processRequest()接收到拉消息的请求* */this.pullAPIWrapper.pullKernelImpl(/** 告诉broker,要从哪个topic的哪个MessageQueue取消息* */pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),/** 告诉broker,要从哪个topic的哪个MessageQueue,取该queue的 queue offset为几的索引块,比如取第1或者第2块* 拿到索引块,就能拿到该索引块中的 全局物理偏移量 + 消息size* */pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,/** 这里也在上报消费进度,即每次拉消息的同时,也在上报消费进度** 也就是说,上报消费进度有两个途径:* 1. 这里拉消息是上报* 2. 每隔persistConsumerOffsetInterval = 1000 * 5,往broker端的 ConsumerOffsetManager 的offsetTable中,更新一次消费进度* */commitOffsetValue,/** 长轮询的最大超时时间,不能让broker无限制的hold住consumer端的拉请求* 不然这样,当broker hold住的请求太多时,broker的内存会扛不住的(设置超时时间,是系统兜底的重要策略)* */BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,/*向broker发送拉消息的请求,并向broker传递一个pullCallback*/pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}}

消息的解码和客户端二次过滤

public class PullAPIWrapper {public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {/** decodes:反序列化** 将broker端,通过网络请求发回的二进制流格式的消息,解码成实际的一条条消息* */ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);/** 消息的二次过滤(客户端过滤)** 这里不再像broker端用hashCode值比对了,而是用tag本身进行比对* */List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {// 用tag 比对if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult;}

拉取结果类结构

public class PullResult {private final PullStatus pullStatus;private final long nextBeginOffset;private final long minOffset;private final long maxOffset;private List<MessageExt> msgFoundList;
}

服务端

package org.apache.rocketmq.remoting.netty;public abstract class NettyRemotingAbstract {/*** Entry of incoming command processing.* NettyRemotingClient 和 NettyRemotingServer,都继承了NettyRemotingAbstract** NettyRemotingClient 和 NettyRemotingServer,也都会调用这个processMessageReceived()方法*/public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
}

网络请求的路由分发处

public abstract class NettyRemotingAbstract {/*** Process incoming request command issued by remote peer.** @param ctx channel handler context.* @param cmd request command.*/public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);/** 根据不同的请求类型(不同的requestCode)* 选择不同的NettyRequestProcessor的子类,来执行请求** 比如,如果是拉消息请求,那么就调用PullMessageProcessor的processRequest()来处理请求* 当然,最终还是要依靠 Pair<NettyRequestProcessor, ExecutorService> pair中的 ExecutorService线程池,来真正执行请求** 所以,也可以认识到PullMessageProcessor的processRequest()方法,是活在线程池中的,* 也就是说,它是会被多线程调用的,也就是说它内部的容器,是可能会有线程安全问题的;* */final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {/** 接收到客户端发来的,拉取消息的请求后,调用NettyRemotingAbstract#processRequestCommand()方法* 将客户端发过来的拉消息的request,重新组装成一个runnable,丢入新线程池 pullMessageExecutor 中;** 也就是说,这里netty线程,在得到客户端的拉消息请求后,并没有直接就开始处理* netty线程而是,将请求又丢给业务线程池,目的是为了保持netty线程的轻量** 这里的线程池,也就是BrokerController#registerProcessor()中注册的指令与线程池组* */final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}/*** Process response from remote peer to the previous issued requests.** @param ctx channel handler context.* @param cmd response command instance.*/public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}}

按照请求类型code,进行请求转发

dbe2425b61944fe4a2f270fd534fbfde.png

既然这里有使用这些XxxxxProcessor,那么肯定就有注册它们的位置,就在BrokerController的初始化逻辑里面


public class BrokerController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);private final BrokerConfig brokerConfig;private final NettyServerConfig nettyServerConfig;private final NettyClientConfig nettyClientConfig;private final MessageStoreConfig messageStoreConfig;/** 管理topic队列的消费进度* */private final ConsumerOffsetManager consumerOffsetManager;private final ConsumerManager consumerManager;private final ConsumerFilterManager consumerFilterManager;private final ProducerManager producerManager;private final ClientHousekeepingService clientHousekeepingService;private final PullMessageProcessor pullMessageProcessor;private final PullRequestHoldService pullRequestHoldService;private final MessageArrivingListener messageArrivingListener;private final Broker2Client broker2Client;private final SubscriptionGroupManager subscriptionGroupManager;private final ConsumerIdsChangeListener consumerIdsChangeListener;private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();private final BrokerOuterAPI brokerOuterAPI;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerControllerScheduledThread"));private final SlaveSynchronize slaveSynchronize;/** 各种业务阻塞队列,和下方的各种隔离的业务线程池对应** */private final BlockingQueue<Runnable> sendThreadPoolQueue;private final BlockingQueue<Runnable> pullThreadPoolQueue;private final BlockingQueue<Runnable> replyThreadPoolQueue;private final BlockingQueue<Runnable> queryThreadPoolQueue;private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;private final FilterServerManager filterServerManager;private final BrokerStatsManager brokerStatsManager;private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();private MessageStore messageStore;private RemotingServer remotingServer;private RemotingServer fastRemotingServer;private TopicConfigManager topicConfigManager;/** 各种隔离的业务线程池** */private ExecutorService sendMessageExecutor;private ExecutorService pullMessageExecutor;private ExecutorService replyMessageExecutor;private ExecutorService queryMessageExecutor;private ExecutorService adminBrokerExecutor;private ExecutorService clientManageExecutor;private ExecutorService heartbeatExecutor;private ExecutorService consumerManageExecutor;private ExecutorService endTransactionExecutor;private boolean updateMasterHAServerAddrPeriodically = false;private BrokerStats brokerStats;private InetSocketAddress storeHost;private BrokerFastFailure brokerFastFailure;private Configuration configuration;private FileWatchService fileWatchService;private TransactionalMessageCheckService transactionalMessageCheckService;private TransactionalMessageService transactionalMessageService;private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;private Future<?> slaveSyncFuture;private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;this.consumerOffsetManager = new ConsumerOffsetManager(this);/** autoCreateTopicEnable机制-Step1:在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,* 如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息。** 在Broker端的topic配置管理器中存在的路由信息,* 一方面会向Nameserver发送心跳包,汇报到Nameserver,* 另一方面会有一个定时任务,定时存储在broker端,具体路径为${ROCKET_HOME}/store/config/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。* */this.topicConfigManager = new TopicConfigManager(this);this.pullMessageProcessor = new PullMessageProcessor(this);this.pullRequestHoldService = new PullRequestHoldService(this);this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);this.consumerFilterManager = new ConsumerFilterManager(this);this.producerManager = new ProducerManager();this.clientHousekeepingService = new ClientHousekeepingService(this);this.broker2Client = new Broker2Client(this);this.subscriptionGroupManager = new SubscriptionGroupManager(this);this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);this.filterServerManager = new FilterServerManager(this);this.slaveSynchronize = new SlaveSynchronize(this);/** 初始化各种业务阻塞队列* */this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));/** 初始化快速失败对象,并将brokerController自身传入* */this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}}

Broker的初始化逻辑:BrokerController#initialize() 


public class BrokerController {public boolean initialize() throws CloneNotSupportedException {boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();if (result) {try {this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);/** 猜测应该是,用多线程的模式,来调用SendMessageProcessor的processRequest()来完成单条消息或者批量消息的保存** 消息发送者向 Broker 发送消息写入请求,* Broker 端在接收到请求后会首先放入一个队列中(SendThreadPoolQueue),默认容量为 10000。* Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为1。* */this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));/** 注册各种处理器* */this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null&& this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {// 定时打印master 与 slave的差距this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}initialTransaction();initialAcl();initialRpcHooks();}return result;}
}

实际注册的各种处理器

public class BrokerController {/*** broker在启动时,给每种命令,注册了对应的处理器* */public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);/** 不同的RequestCode对应不同的动作** 这里就是,让下面这几个动作,都交给(sendProcessor, this.sendMessageExecutor)来执行* */this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor** 负责处理,客户端来broker端* 查询消费进度* 更新消费进度**/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/89072.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/89072.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux打包固件shell脚本

不打包 pack.sh解压后无父目录&#xff08;直接是文件&#xff09;生成 checksum.txt&#xff08;包含所有文件的 SHA256&#xff09;打包后 .tar.gz 移动到上级目录#!/bin/bash# 检查是否传入版本号参数 if [ -z "$1" ]; thenecho "Usage: $0 <version> …

用uniapp开发鸿蒙应用(暂停更新-根据项目更新,现在项目未开始)

1.根据博客生成.hap文件 【鸿蒙HarmonyOS开发技巧&#xff1a;如何不依赖华为商店直接安装uniapp生成的app文件&#xff1f;一键转换app至hap格式教程详解】_entry-default-signed.hap-CSDN博客 根据网络查询鸿蒙手机安装测试app&#xff0c;需要电脑命令安装 在鸿蒙HarmonyOS手…

Linux 文件系统实现层详解:原理、结构与驱动衔接

&#x1f4c2; Linux 文件系统实现层详解&#xff1a;原理、结构与驱动衔接 &#x1f3ac; 推荐搭配视频学习&#xff1a;Linux 文件系统子系统&#xff1a;三层架构全面掌握 一、为什么要重点理解文件系统实现层&#xff1f; 文件系统实现层是 Linux 文件系统的“地基”&…

区块链应用场景深度解读:金融领域的革新与突破

引言&#xff1a;区块链技术的演进与金融领域的变革区块链技术自2008年诞生以来&#xff0c;以其去中心化、不可篡改、可追溯等特性&#xff0c;在全球范围内引发了金融领域的深刻变革。从最初的数字货币实验&#xff0c;到如今在跨境支付、证券交易、供应链金融等领域的广泛应…

redisson tryLock

应用场景RLock rLock redissonClient.getLock(Constant_LOCK request.getId()); try {boolean isLocked rLock.tryLock();if (!isLocked) {throw new ServiceException(ErrConstant.OPERATION_FAILED, "请勿重复提交");}源码public interface RLock extends Lock,…

前端docx库实现将html页面导出word

前言&#xff1a;最近遇到一个需求&#xff0c;需要将页面的html导出为word文档&#xff0c;并且包含横向和竖向页面&#xff0c;并且可以进行混合方向导出。经过一段时间的实验&#xff0c;发现只有docx这个库满足这个要求。在这里记录一下实现思路以及代码。 docx官网 一、…

虚拟主机CPU占用100导致打不开的一次处理

背景 突然有一天&#xff0c;有个客户网站打不开了&#xff0c;发来这样一张图片问题排查 打开阿里云虚拟主机控制面板&#xff0c;CPU 使用率已经达到了100%&#xff0c;这说明网站已经在高负荷运转。分析访问日志发现&#xff0c;网站出现了大量循环路径&#xff0c;其 UserA…

设计模式之工厂模式:对象创建的智慧之道

工厂模式&#xff1a;对象创建的智慧之道 引言&#xff1a;为什么我们需要工厂模式&#xff1f; 在软件开发中&#xff0c;对象创建是最常见的操作之一。当代码中充满new关键字时&#xff0c;系统会面临三大痛点&#xff1a; 紧耦合&#xff1a;客户端代码直接依赖具体实现类扩…

Docker镜像制作案例

1、使用Docker commit制作镜像为ubuntu镜像提供ssh服务①&#xff1a;拉取镜像[rootopenEuler-1 ~]# docker pull ubuntu:18.04②&#xff1a;启动镜像[rootopenEuler-1 ~]# docker run --name c1 -it --rm ubuntu:18.04 bash③&#xff1a;替换aliyun源mv /etc/apt/sources.li…

KeilMDK5如何生成.bin文件

1&#xff1a;主要是要找到fromelf.exe的路径2&#xff1a;接下来要做的要视情况而定&#xff1a;选完fromelf.exe后在输入框中加个空格然后加一串字 : --bin -o ./Obj/L.bin ./Obj/L.axf&#xff0c;如下我设置的L最终会替换成项目名 3&#xff1a;去构建生成编译一下&#…

Ajax接收java后端传递的json对象包含长整型被截断导致丢失精度的解决方案

问题描述 在使用java编写代码的时候,后端返回前端的JSON对象中包含了Long长整型,前端接受的时候丢失了精度问题。 比如: 后端传递的json {"code": "200","msg": "操作成功","data":

MybatisPlus由浅入深

MyBatis-Plus&#xff08;简称 MP&#xff09;是一个 MyBatis 的增强工具&#xff0c;旨在简化开发过程。基本使用步骤1.依赖引入<!-- mysql依赖 --> <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>…

蓝牙信号强度(RSSI)与链路质量(LQI)的测量与应用:面试高频考点与真题解析

在蓝牙通信领域&#xff0c;信号强度&#xff08;RSSI&#xff09;和链路质量&#xff08;LQI&#xff09;是评估无线链路性能的核心指标。无论是智能家居设备的连接优化&#xff0c;还是工业物联网中的抗干扰设计&#xff0c;这两个指标都扮演着关键角色。本文将结合面试高频考…

PyTorch的计算图是什么?为什么绘图前要detach?

在PyTorch中&#xff0c;计算图&#xff08;Computational Graph&#xff09; 是自动求导&#xff08;Autograd&#xff09;的核心机制。理解计算图有助于解释为什么在绘图前需要使用 .detach() 方法分离张量。一、什么是计算图&#xff1f; 计算图是一种有向无环图&#xff08…

深度学习入门代码详细注释-ResNet18分类蚂蚁蜜蜂

本项目将基于PyTorch平台迁移ResNet18模型。该模型原采用ImageNet数据集&#xff08;含1000个图像类别&#xff09;进行训练。我们将尝试运用该模型对蚂蚁和蜜蜂进行分类&#xff08;这两个类别未包含在原训练数据集中&#xff09;。 本文的原始代码参考于博客深度学习入门项目…

北京饮马河科技公司 Java 实习面经

北京饮马河科技公司 Java 实习面经 本文作者&#xff1a;程序员小白条 本站地址&#xff1a;https://xbt.xiaobaitiao.top 1&#xff09; 面试官&#xff1a;我看你这块是有一个开源的项目&#xff0c;这个项目主要是做什么的&#xff1f; 我&#xff1a;主要两点是亮点&…

java基础(day07)

目录 OOP编程 方法 方法的调用&#xff1a; 在main入口函数中调用&#xff1a; 动态参数&#xff1a; 方法重载 OOP编程 方法 概念&#xff1a;指为获得某种东西或达到某种目的而采取的手段与行为方式。有时候被称作“方法”&#xff0c;有时候被称作“函数”。例如UUID.…

使用EasyExcel动态合并单元格(模板方法)

1、导入EasyExcel依赖<dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>4.0.3</version> </dependency>2、编写实体类Data publci class Student{ ExcelProperty("姓名")pri…

jenkins 流水线比较简单直观的

//全篇没用自定义变量pipeline {agent any// 使用工具自动配置Node.js环境tools {nodejs nodejs22 // 需在Jenkins全局工具中预配置该名称的Node.js安装}//下面拉取代码通过的是流水线片段生成的stages {stage(Checkout Code) {steps {git branch: release-v1.2.6,credentials…

CV目标检测中的LetterBox操作

LetterBox类比理解&#xff1a;想象你要把一张任意形状的照片放进一个正方形的相框里&#xff0c;照片不能变形拉伸&#xff0c;所以你先等比例缩小照片&#xff0c;然后在空余的地方填上灰色背景。第1章 数学原理当我们有一个原始图像的尺寸为 19201080&#xff08;宽高&#…