如何解决线程池任务拒绝异常(RejectedExecutionException)

后端 潘老师 3个月前 (02-06) 150 ℃ (0) 扫码查看

在开发过程中,我们常常会遇到各种意想不到的问题。这不,测试小哥急匆匆地找上门来,告知在使用ThreadPoolExecutor执行异步任务时,测试环境出现了报错:

java.util.concurrent.RejectedExecutionException: task java.util.concurrent.FutureTask@1e19e316 rejected
from java.util.concurrent.ThreadPoolExecutor@647b9364[running, pool size = 12, active threads = 12,
queued tasks = 32, completed tasks = 44]

奇怪的是,这段代码在本地运行一切正常,可一到测试环境就抛出RejectedExecutionException异常,这究竟是怎么回事呢?

让我们深入代码内部寻找答案:

List<List<String>> partitionedIds = Lists.partition(externalUserIds, 100);
List<CompletableFuture<List<ExternalUserRecord>>> futureList = partitionedIds.stream()
   .map(batch -> CompletableFuture.supplyAsync(
        () -> externalUserRecordService.batchGetExternalUserRecord(batch), executor))
   .collect(Collectors.toList());

经过分析,发现问题的关键在于:externalUserIds数据会按照100个一组进行分批处理。比如说,如果externalUserIds包含5000条数据,就会被拆分成50组。而每一组数据都会提交一个异步任务,这就导致可能同时有50个任务提交到线程池。

要知道,线程池的队列默认最大容量是32。一旦提交的任务数量超过这个队列容量,就会抛出RejectedExecutionException异常。

那为什么本地运行正常,测试环境却出错呢?原因其实很简单,本地的数据量比较小,假设externalUserIds只有300条数据,分批后仅仅产生3组任务,远未达到队列的最大容量。而测试环境的数据量可能非常大,externalUserIds可能包含上万条数据,产生的任务数量远远超过了32,这就触发了异常。

找到了问题根源,接下来看看都有哪些解决方案。

方案1:扩大队列容量(当前已采用方案)

ThreadPoolExecutor的配置中,将queueCapacity从32提升到10000,具体代码如下:


private static final int QUEUE_CAPACITY = 10000;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
threadPoolTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
threadPoolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setThreadFactory(new CustomizableThreadFactory("excellent-mall-pool-thread-"));
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}

这种方案的优点是能够确保所有任务都不会被拒绝,都能得到执行。但缺点也很明显,大量任务排队等待执行,可能会导致任务等待时间过长。

方案2:限制并发任务数量

借助`Semaphore`来控制同时执行的任务数量,防止一次性提交过多任务,代码实现如下:


Semaphore semaphore = new Semaphore(10); 

List<CompletableFuture<List<ExternalUserRecord>>> futureList = partitionedIds.stream()
   .map(batch -> CompletableFuture.supplyAsync(() -> {
        try {
            semaphore.acquire(); 
            return externalUserRecordService.batchGetExternalUserRecord(batch);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Collections.emptyList();
        } finally {
            semaphore.release(); 
        }
    }, executor))
   .collect(Collectors.toList());

该方案的优势在于能够有效控制最大并发任务数,避免线程池因任务过多而超负荷运行。不过,如果任务总量较多,这种限制可能会对整体的执行速度产生一定影响。

方案3:分批有序执行任务

摒弃一次性提交所有任务的方式,改为按批次执行,等待前一批任务执行完毕后,再提交下一批任务,代码如下:

for (List<String> batch : partitionedIds) {
List<CompletableFuture<List<ExternalUserRecord>>> futureList = batch.stream()
   .map(ids -> CompletableFuture.supplyAsync(
        () -> externalUserRecordService.batchGetExternalUserRecord(ids), executor))
   .collect(Collectors.toList());

// 等待本批任务执行完再提交下一批
futureList.forEach(CompletableFuture::join);
}

这种方案的好处是不会让线程池瞬间承受大量任务的压力,有效降低了线程池的负担。但由于需要依次等待每一批任务完成,所以任务整体的执行时间可能会稍有延长。

总结

总结一下,本次出现线程池任务拒绝异常的根本原因是线程池队列容量不足,导致部分任务被拒绝。目前最直接的解决办法是增大queueCapacity,不过从长远来看,更推荐使用Semaphore限制并发任务数,或者采用按批次执行任务的方式,避免瞬间提交大量任务。

在实际项目中,我们可以根据具体的业务需求,权衡这三种方案的利弊,选择最合适的方案,以此保障系统的稳定性和执行效率。

今天的分享就先到这里,还有个查询耗时8s的页面等着我去优化,这又是一个值得深入研究的问题,先留个悬念,后续再和大家分享。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/13959.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】