百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

RocketMQ中的线程池是如何创建的?

moboyou 2025-06-05 16:49 3 浏览

前言

大家好,我是小郭,今天主要来和大家聊一聊RocketMQ中的线程池是如何创建的,如何设置线程池数量,同时也可以从中去学习到一些线程池的实践和需要注意的一些细节。

RocketMQ在哪些地方使用到了线程池?

在RocketMQ中存在了大量的对线程池的使用,从消息的生产到投递Broker中,到最后的消息消费每一个环节中都大量使用到线程池的地方,下面我们拿出几个不同类型的线程池来看一看。

在 NameServer的路由注册和剔除中,多次使用到了定时线程池

定时线程池

private final ScheduledExecutorService scheduledExecutorService =
	Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
		"NSScheduledThread"));
复制代码
// 定时任务 每10s扫描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	
	@Override
	public void run() {
		NamesrvController.this.routeInfoManager.scanNotActiveBroker();
	}
}, 5, 10, TimeUnit.SECONDS);

//定时任务,每隔30s向集群中所有NameServer发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	
	@Override
	public void run() {
		try {
			BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
		} catch (Throwable e) {
			log.error("registerBrokerAll Exception", e);
		}
	}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
复制代码

线程池newFixedThreadPool

FixedThreadPool常用于创建一个固定大小的线程池,

它的特点就是核心线程数量与最大线程数量一致,采用无界的阻塞队列 LinkedBlockingQueue,并且没有设置队列的大小默认是Integer.MAX_VALUE,适用于负载较重的场景

private ExecutorService remotingExecutor;

this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 用来设置接收到消息后的处理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
复制代码

消息发送初始化默认异步发送者线程池

核心线程数与最大线程数设置均为 Runtime.getRuntime().availableProcessors() ,可用的计算资源

阻塞队列设置为一个初始化50000长度的阻塞队列

keepAliveTime设置60s,超过则时间空闲的线程将被终止

private final ExecutorService defaultAsyncSenderExecutor;

private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);

this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
	Runtime.getRuntime().availableProcessors(),
	Runtime.getRuntime().availableProcessors(),
	1000 * 60,
	TimeUnit.MILLISECONDS,
	this.asyncSenderThreadPoolQueue,
	new ThreadFactory() {
		private AtomicInteger threadIndex = new AtomicInteger(0);

		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
		}
	});
复制代码

消费端拉取消息线程池

我们重点来看一下消费端的线程池是如何创建,它可以说是整个RocketMQ中最关键的一个线程池

为了提高消费速度,我们通常有两种方式来提高消费并行度

  1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度
  2. 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

如何创建?

在消息监听的时候,利用线程池进行不断的拉取消息

提交消费请求,消息提交到内部的线程池

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
复制代码

参数设置

创建内部线程池,核心参数核心线程数和最大线程数,主要是根据配置来进行设置

设置线程池名称以 ConsumeMessageThread_ 开头的,利于排查问题

阻塞队列是一个无界的阻塞队列LinkedBlockingQueue

private final BlockingQueue<Runnable> consumeRequestQueue;

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();


this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));
复制代码

通过RocketMQ的源码,我们看到 consumeExecutor 线程池的创建也是非常简单的

如果想要修改线程池参数,需要注意什么?

根据线程池的原理我们知道,只有阻塞队列为满的情况下,不会创建临时线程

所以线程池内部持有的队列为一个无界队列,导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量

什么时候需要修改?

在正常的业务场景中,启动应用之后,我们就不会再修改消费者线程数,但有可能突发业务高峰导致消息堆积,这时候我们就需要调整单个 Consumer 的消费并行线程数。

如何修改线程数?

  1. 修改线程池后,重新启动消费者,缺点是参数不易评估,随着业务的并发提升,需要频繁的重启服务来更改线程数,这势必会带来一定的造成影响。
  2. 官方也为我们提供了修改线程数的方法,当更新的线程数大于0且小于 Short.MAX_VALUE 且小于最大线程数,则更新核心线程数。

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略

@Override
public void updateCorePoolSize(int corePoolSize) {
    if (corePoolSize > 0
        && corePoolSize <= Short.MAX_VALUE
        && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
        this.consumeExecutor.setCorePoolSize(corePoolSize);
    }
}
复制代码

这两种方式都存在一定的痛点

  1. 线程数量随着业务的变动,需要修改代码
  2. 在springBoot和SpringCloud Stream下,对线程池参数变更不是很友好
  3. 不能通过管理界面,直接动态修改线程池参数

针对上面的痛点问题,我们可以考虑封装线程池动态参数调整,首先肯定原来代码是毫无侵入性的,

同时通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。

相关推荐

python多线程实现查找目录下有没有相同哈希值的文件

python多线程实现查找目录下有没有相同的文件,列出哈希值相同的文件importosimporthashlibfromconcurrent.futuresimportThreadPoo...

Java、Go 和 Python 多线程性能对比

大家好,我是难瓜。今天分享多线程下这三门语言的表现。简介在计算机中,线程是可以由处理器独立执行的小指令序列。多线程在一个进程中是可能的,其中它们共享资源,例如指令和上下文。发现在运行多线程进程时效率最...

干货分享丨Python多线程之_thread与threading模块

在Python程序中,多线程的应用程序会创建一个函数,来执行需要重复执行多次的程序代码,然后创建一个线程执行该函数。一个线程是一个应用程序单元,用于在后台并行执行多个耗时的动作。在多线程的应用程序中,...

一文带您了解Python中的并发:异步(Asyncio)和多线程(Thread)

Python以其简单性和多样性而闻名,是一种适用于广泛应用领域的编程语言。在处理多个任务并发时,Python提供了两种主要方法:Asyncio用于异步编程,Multithreading用于管理多个...

解锁Python并发编程:多线程和多进程的神秘面纱揭晓

欢迎来到我们的系列博客《Python全景系列》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语法。无论你是编程新手,还是有一定基础的开发...

Python多线程-基础篇

一、多线程相关概念1.并发和并行的区别并发和并行是即相似又有区别的两个概念,并行是指两个或者多个事件在同一时刻同时执行,而并发是指两个或多个事件通过时间片轮流被执行。从计算机工作原理的角度出发,“并发...

PYTHON多线程实现web服务器httpserver实例

PYTHON多线程实现web服务器importhttp.serverimportsocketserverimportthreading#服务器监听的端口PORT=8000#...

如何编写Python漏洞验证脚本(单线程和多线程)

我们实战经常会遇到以下几个问题:1、遇到一个利用步骤十分繁琐的漏洞,中间错一步就无法利用2、挖到一个通用漏洞,想要批量刷洞小赚一波,但手动去测试每个网站工作量太大这个时候编写一个poc脚本将会减轻...

Python 多线程高频面试题,直接把这些答案“甩在”面试官脸上

点赞、收藏、加关注,下次找我不迷路不管你是刚入行的新手,还是有一定经验的开发者,掌握多线程的核心问题,都能让你在面试中脱颖而出。今天咱就来盘一盘5个高频的Python多线程面试题,用通俗易懂...

python多进程和多线程的使用和对比

介绍多线程和多进程是常见的并发编程模型,它们被广泛应用于各种类型的应用程序中。在本文中,我将就Python多线程和多进程进行详细的对比。首先,让我们来看一下Python多线程。多线程是一种并发编程模型...

24-3-Python多线程-线程队列-queue模块

3-1-概念queue模块提供了多线程编程中的队列实现,队列是线程安全的数据结构,能在多线程环境下安全地进行数据交换。3-2-queue的队列类型Queue(先进先出队列)、LifoQueue(后进...

玩蛇(Python) - 并发编程之多线程

一、线程简介线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Pytho...

Python多进程与多线程应用场景对比

在Python中,多进程(Multiprocessing)和多线程(Multithreading)的选择取决于任务类型(I/O密集型vsCPU密集型)、Python的GIL限制以及并...

Python多线程,守护线程和非守护线程,线程的join方法,代码案例

守护线程和非守护线程守护线程&&非守护线程守护线程,是和主线程一起结束的线程,叫守护线程,非守护线程,主线程的结束不影响该线程的执行,主线程结束非守护线程不会立刻结束,也叫用户线程。Python的守护...

Python3中最常用的5种线程锁你会用吗

前言本章节将继续围绕threading模块讲解,基本上是纯理论偏多。对于日常开发者来讲很少会使用到本章节的内容,但是对框架作者等是必备知识,同时也是高频的面试常见问题。私信小编01即可获取大量Pyth...