如何使用CompletableFuture进行大数据量并发处理
moboyou 2025-06-08 18:49 2 浏览
面试官:有这样的一个需求,一批几百万的用户数据,需调用第三方的接口给用户发送消息。如何在一分钟内快速给这批用户发送完消息。
在处理这种要求在一分钟内向大量用户发送消息的场景中,可以使用以下方法结合CompletableFuture来实现高并发处理:
- 分批发送:将大批量的用户数据分成多个小批次进行并发发送。这样可以减少单批次发送的负载和提高处理效率。可以通过设置每批次的大小来控制并发度。
- 异步并发处理:使用CompletableFuture来实现异步并发处理。将每个小批次的发送任务包装成一个CompletableFuture,然后使用CompletableFuture的方法来进行并发处理和等待任务完成。
下面是一个简单的代码示例,展示了如何使用CompletableFuture来进行分批并发发送消息的处理:
// 假设已经准备好了用户数据列表 userList
// 将用户数据分成多个小批次
List<List<User>> batches = new ArrayList<>();
int batchSize = 1000; // 每批次的大小
for (int i = 0; i < userList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, userList.size());
List<User> batch = userList.subList(i, endIndex);
batches.add(batch);
}
// 使用CompletableFuture进行分批并发发送消息
List<CompletableFuture<Void>> sendFutures = new ArrayList<>();
for (List<User> batch : batches) {
CompletableFuture<Void> sendFuture = CompletableFuture.runAsync(() -> {
// 处理当前批次的发送任务
for (User user : batch) {
// 发送消息给用户
sendMessage(user);
}
});
sendFutures.add(sendFuture);
}
// 等待所有发送任务完成
CompletableFuture.allOf(sendFutures.toArray(new CompletableFuture[0])).join();
将用户数据列表userList分成多个小批次,每个批次的大小为batchSize。然后,使用CompletableFuture将每个小批次的发送任务进行异步并发处理。最后,使用CompletableFuture.allOf()等待所有发送任务完成。
在实际生产环境中,需要根据具体情况进行调优和优化,例如控制分批的大小、并发度、合理的线程池配置等,以确保系统可以承受高并发压力。
在解决高并发问题中经常会使用到CompletableFuture。CompletableFuture在解决高并发问题时提供了一些常用的方法和技巧。
- 异步并行执行任务:
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行异步任务
});
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
上述代码创建了10个CompletableFuture实例,每个CompletableFuture都执行一个异步任务。通过CompletableFuture.allOf()方法等待所有任务完成。
- 等待任意任务完成:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 执行异步任务 1
return "Result 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 执行异步任务 2
return "Result 2";
});
CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(future1, future2);
String result = (String) firstCompleted.get(); // 获取第一个完成的任务的结果
上述代码创建了两个CompletableFuture实例,并使用CompletableFuture.anyOf()方法等待任意一个任务完成。
- 异常处理和默认值:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 执行异步任务
if (someCondition) {
throw new RuntimeException("Error");
}
return 42;
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
// 异常处理
return 0;
});
int result = handledFuture.get(); // 获取处理后的结果,如果有异常则返回默认值
上述代码使用
CompletableFuture.exceptionally()方法处理异步任务的异常,并返回一个默认值。
- 任务组合:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
// 执行异步任务 1
return 42;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 执行异步任务 2
return "Result";
});
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
// 结果组合
return result1 + result2;
});
String result = combinedFuture.get(); // 获取组合后的结果
CompletableFuture.thenCombine()方法将两个任务的结果进行组合。
CompletableFuture常用方法:
thenApply():把前面任务的执行结果,交给后面的Function
thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回
and集合关系
thenCombine():合并任务,有返回值
thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)
or聚合关系
applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)
并行执行
allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
结果处理
whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作
exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成
CompletableFuture源码分析:
CompletableFuture是Java 8引入的一个用于实现异步编程的工具类。它基于Future接口,并扩展了更多的功能,例如可以将多个CompletableFuture组合在一起进行串行或并行操作,并且提供了异常处理、超时等特性。
CompletableFuture的底层源码相对复杂,主要的设计和实现原理:
1. 内部状态:CompletableFuture类中包含了一个内部的AtomicReference字段,用于保存CompletableFuture的结果或异常。这个字段使用了一种CAS(Compare And Swap)原子操作来保证线程安全。
2. 异步执行:CompletableFuture支持异步执行任务,它使用了ForkJoinPool来调度任务的执行。异步任务可以通过supplyAsync()方法创建,也可以通过完成一个CompletableFuture来触发执行。
3. 执行链:CompletableFuture支持通过一系列的方法调用构建执行链。每个方法都会返回一个新的CompletableFuture对象,用于表示中间结果或最终结果。这种链式调用的设计让代码更加简洁和可读。
4. CompletionStage接口:CompletableFuture实现了CompletionStage接口,该接口定义了一系列以then开头的方法,用于设置任务完成后的回调操作。这使得CompletableFuture可以方便地进行任务的组合和串行/并行执行。
5. 异常处理:CompletableFuture提供了exceptionally()和handle()方法来处理任务执行过程中的异常。exceptionally()方法用于处理异常并返回一个默认值,而handle()方法可以处理异常并返回一个新的结果。
6. 组合操作:CompletableFuture支持多个CompletableFuture的组合操作,包括thenApply()、thenCompose()、thenCombine()等。这些方法可以实现串行执行、并行执行以及多个任务之间的依赖关系。
相关推荐
- Spring Boot3 中多线程技术的使用指南
-
在当今互联网应用场景下,高并发、大数据量处理已成为常态。用户对应用的响应速度和处理能力要求越来越高。以一个电商平台的订单处理系统为例,在促销活动期间,短时间内会涌入大量订单请求,如果采用单线程理,所有...
- 多线程会带来的一些问题
-
前言前面我们已经知道了,在使用多线程会给我们带来一些性能上的提升。但一个东西的出现总是会存在优缺点。1、多线程会在线程安全问题什么是线程安全?在深入理解JVM这本书中有句话可以很简单的去理解“如果一个...
- 经典面试题:SpringBoot 应用可以同时并发处理多少请求
-
前言hello,大家好,,最近逛帖子看到一个面试题:SpringBoot应用可以同时并发处理多少请求?看到这个问题大多数朋友也许都会回答200,这样你也许第二天就会收到如下拒信:你可能会很难疑惑,...
- Java并发包(java.util.concurrent)探秘
-
Java并发包(java.util.concurrent)探秘在当今高并发、高负载的应用场景下,Java的并发包(java.util.concurrent)成为了开发者手中的神器。它不仅简化了并发编程...
- 操作系统-多线程编程-并发编程机制
-
十四、多线程编程POSIX标准中定义的现程,属性,操作方法被广泛认可和遵循。最贴近POSIX标准的线程实现,NPTL(NativePOSIXThreadsLibrary)线程可以看作进程的一个...
- Java 多线程:让你的程序像开挂一样干活!
-
你有没有想过,当你的Java程序在处理大量任务时,能不能像哪吒三头六臂一样,同时做好几件事?答案就在Java多线程!它能让你的程序瞬间“开挂”,大大提高效率。别被“多线程”这个听起来高大...
- 记一次Synchronized使用不合理,导致的多线程下线程阻塞问题排查
-
在为客户进行性能诊断调优时,碰到了一个Synchronized关键字使用不合理导致多线程下线程阻塞的情况。用文字记录下了问题的整个发现-排查-分析-优化过程,排查过程中使用了我司商业化产品——XLan...
- Disruptor—2.并发编程相关简介
-
大纲1.并发类容器2.volatile关键字与内存分析3.Atomic系列类与UnSafe类4.JUC常用工具类5.AQS各种锁与架构核心6.线程池的最佳使用指南1.并发类容器(1)Concurren...
- 实战项目:手把手带你实现一个高并发内存池
-
项目介绍1.这个项目做的是什么?当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-CachingMalloc,即线程缓存的ma...
- JAVA多线程详解(超详细)
-
一、线程简介1、进程、线程程序:开发写的代码称之为程序。程序就是一堆代码,一组数据和指令集,是一个静态的概念。进程(Process):将程序运行起来,我们称之为进程。进程是执行程序的一次执行过程,它...
- ConcurrentModificationException 并发修改异常的真相与破解之道
-
在Kotlin开发中,ConcurrentModificationException(并发修改异常)是让开发者头疼的“老熟人”。无论是单线程遍历时修改集合,还是多线程并发操作集合,这个异常都可能突然出...
- Java并发编程核心技巧:让程序飞速奔跑的秘密武器
-
Java并发编程核心技巧:让程序飞速奔跑的秘密武器在这个信息化的时代,我们的应用程序需要处理越来越多的数据,同时还要保证响应速度和稳定性。Java作为一门广泛应用于企业级开发的语言,其强大的并发编程能...
- 如何使用CompletableFuture进行大数据量并发处理
-
面试官:有这样的一个需求,一批几百万的用户数据,需调用第三方的接口给用户发送消息。如何在一分钟内快速给这批用户发送完消息。在处理这种要求在一分钟内向大量用户发送消息的场景中,可以使用以下方法结合Com...
- 线程安全集合 --- Concurrent
-
引言最近看一些代码的时候,发现有人用System.Collections.Concurrent下的BlockingCollection很便利的实现了生产者-消费者模式,这是之前没有注意到的...
- Java并发包(Concurrent)详解:让你的程序跑得更快更稳
-
Java并发包(Concurrent)详解:让你的程序跑得更快更稳提到Java并发包(Concurrent),我们就像是进入了武侠世界的“少林武当”,这里高手云集,各种工具类和框架应有尽有,它们就像武...
- 一周热门
- 最近发表
- 标签列表
-
- curseforge官网网址 (16)
- 外键约束 oracle (36)
- oracle的row number (32)
- 唯一索引 oracle (34)
- oracle in 表变量 (28)
- oracle导出dmp导出 (28)
- oracle 数据导出导入 (16)
- oracle两个表 (20)
- 启动oracle的监听服务 (13)
- oracle 数据库 字符集 (20)
- powerdesigner oracle (13)
- oracle修改端口 (15)
- 左连接 oracle (15)
- oracle 转义字符 (14)
- oracle安装补丁 (19)
- matlab归一化 (16)
- matlab脚本 (14)
- matlab阶跃函数 (14)
- 三次样条插值matlab (14)
- 共轭梯度法matlab (16)
- matlab化简多项式 (20)
- 在线客服网页源码 (14)
- 多线程的创建方式 (29)
- 多线程 python (30)
- java多线程并发处理 (32)