百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

如何使用CompletableFuture进行大数据量并发处理

moboyou 2025-06-08 18:49 56 浏览

面试官:有这样的一个需求,一批几百万的用户数据,需调用第三方的接口给用户发送消息。如何在一分钟内快速给这批用户发送完消息。

在处理这种要求在一分钟内向大量用户发送消息的场景中,可以使用以下方法结合CompletableFuture来实现高并发处理:

  1. 分批发送:将大批量的用户数据分成多个小批次进行并发发送。这样可以减少单批次发送的负载和提高处理效率。可以通过设置每批次的大小来控制并发度。
  2. 异步并发处理:使用CompletableFuture来实现异步并发处理。将每个小批次的发送任务包装成一个CompletableFuture,然后使用CompletableFuture的方法来进行并发处理和等待任务完成。

下面是一个简单的代码示例,展示了如何使用CompletableFuture来进行分批并发发送消息的处理:

// 假设已经准备好了用户数据列表 userList

// 将用户数据分成多个小批次
List<List<User>> batches = new ArrayList<>();
int batchSize = 1000; // 每批次的大小
for (int i = 0; i < userList.size(); i += batchSize) {
    int endIndex = Math.min(i + batchSize, userList.size());
    List<User> batch = userList.subList(i, endIndex);
    batches.add(batch);
}

// 使用CompletableFuture进行分批并发发送消息
List<CompletableFuture<Void>> sendFutures = new ArrayList<>();
for (List<User> batch : batches) {
    CompletableFuture<Void> sendFuture = CompletableFuture.runAsync(() -> {
        // 处理当前批次的发送任务
        for (User user : batch) {
            // 发送消息给用户
            sendMessage(user);
        }
    });
    sendFutures.add(sendFuture);
}

// 等待所有发送任务完成
CompletableFuture.allOf(sendFutures.toArray(new CompletableFuture[0])).join();

将用户数据列表userList分成多个小批次,每个批次的大小为batchSize。然后,使用CompletableFuture将每个小批次的发送任务进行异步并发处理。最后,使用CompletableFuture.allOf()等待所有发送任务完成。

在实际生产环境中,需要根据具体情况进行调优和优化,例如控制分批的大小、并发度、合理的线程池配置等,以确保系统可以承受高并发压力。

在解决高并发问题中经常会使用到CompletableFuture。CompletableFuture在解决高并发问题时提供了一些常用的方法和技巧。

  1. 异步并行执行任务:
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 执行异步任务
    });
    futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

上述代码创建了10个CompletableFuture实例,每个CompletableFuture都执行一个异步任务。通过CompletableFuture.allOf()方法等待所有任务完成。

  1. 等待任意任务完成:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 执行异步任务 1
    return "Result 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 执行异步任务 2
    return "Result 2";
});
CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(future1, future2);
String result = (String) firstCompleted.get(); // 获取第一个完成的任务的结果

上述代码创建了两个CompletableFuture实例,并使用CompletableFuture.anyOf()方法等待任意一个任务完成。

  1. 异常处理和默认值:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    // 执行异步任务
    if (someCondition) {
        throw new RuntimeException("Error");
    }
    return 42;
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
    // 异常处理
    return 0;
});
int result = handledFuture.get(); // 获取处理后的结果,如果有异常则返回默认值

上述代码使用
CompletableFuture.exceptionally()
方法处理异步任务的异常,并返回一个默认值。

  1. 任务组合:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    // 执行异步任务 1
    return 42;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 执行异步任务 2
    return "Result";
});
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
    // 结果组合
    return result1 + result2;
});
String result = combinedFuture.get(); // 获取组合后的结果


CompletableFuture.thenCombine()
方法将两个任务的结果进行组合。

CompletableFuture常用方法:

thenApply():把前面任务的执行结果,交给后面的Function

thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

and集合关系

thenCombine():合并任务,有返回值

thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值

runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)

or聚合关系

applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值

acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值

runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)

并行执行

allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture

anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture

结果处理

whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作

exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成

CompletableFuture源码分析:

CompletableFuture是Java 8引入的一个用于实现异步编程的工具类。它基于Future接口,并扩展了更多的功能,例如可以将多个CompletableFuture组合在一起进行串行或并行操作,并且提供了异常处理、超时等特性。

CompletableFuture的底层源码相对复杂,主要的设计和实现原理:

1. 内部状态:CompletableFuture类中包含了一个内部的AtomicReference字段,用于保存CompletableFuture的结果或异常。这个字段使用了一种CAS(Compare And Swap)原子操作来保证线程安全。

2. 异步执行:CompletableFuture支持异步执行任务,它使用了ForkJoinPool来调度任务的执行。异步任务可以通过supplyAsync()方法创建,也可以通过完成一个CompletableFuture来触发执行。

3. 执行链:CompletableFuture支持通过一系列的方法调用构建执行链。每个方法都会返回一个新的CompletableFuture对象,用于表示中间结果或最终结果。这种链式调用的设计让代码更加简洁和可读。

4. CompletionStage接口:CompletableFuture实现了CompletionStage接口,该接口定义了一系列以then开头的方法,用于设置任务完成后的回调操作。这使得CompletableFuture可以方便地进行任务的组合和串行/并行执行。

5. 异常处理:CompletableFuture提供了exceptionally()和handle()方法来处理任务执行过程中的异常。exceptionally()方法用于处理异常并返回一个默认值,而handle()方法可以处理异常并返回一个新的结果。

6. 组合操作:CompletableFuture支持多个CompletableFuture的组合操作,包括thenApply()、thenCompose()、thenCombine()等。这些方法可以实现串行执行、并行执行以及多个任务之间的依赖关系。

#如何处理线程并发问题?# #大数据量、高并发业务怎么优化?#

相关推荐

Excel技巧:SHEETSNA函数一键提取所有工作表名称批量生产目录

首先介绍一下此函数:SHEETSNAME函数用于获取工作表的名称,有三个可选参数。语法:=SHEETSNAME([参照区域],[结果方向],[工作表范围])(参照区域,可选。给出参照,只返回参照单元格...

Excel HOUR函数:“小时”提取器_excel+hour函数提取器怎么用

一、函数概述HOUR函数是Excel中用于提取时间值小时部分的日期时间函数,返回0(12:00AM)到23(11:00PM)之间的整数。该函数在时间数据分析、考勤统计、日程安排等场景中应用广泛。语...

Filter+Search信息管理不再难|多条件|模糊查找|Excel函数应用

原创版权所有介绍一个信息管理系统,要求可以实现:多条件、模糊查找,手动输入的内容能去空格。先看效果,如下图动画演示这样的一个效果要怎样实现呢?本文所用函数有Filter和Search。先用filter...

FILTER函数介绍及经典用法12:FILTER+切片器的应用

EXCEL函数技巧:FILTER经典用法12。FILTER+切片器制作筛选按钮。FILTER的函数的经典用法12是用FILTER的函数和切片器制作一个筛选按钮。像左边的原始数据,右边想要制作一...

office办公应用网站推荐_office办公软件大全

以下是针对Office办公应用(Word/Excel/PPT等)的免费学习网站推荐,涵盖官方教程、综合平台及垂直领域资源,适合不同学习需求:一、官方权威资源1.微软Office官方培训...

WPS/Excel职场办公最常用的60个函数大全(含卡片),效率翻倍!

办公最常用的60个函数大全:从入门到精通,效率翻倍!在职场中,WPS/Excel几乎是每个人都离不开的工具,而函数则是其灵魂。掌握常用的函数,不仅能大幅提升工作效率,还能让你在数据处理、报表分析、自动...

收藏|查找神器Xlookup全集|一篇就够|Excel函数|图解教程

原创版权所有全程图解,方便阅读,内容比较多,请先收藏!Xlookup是Vlookup的升级函数,解决了Vlookup的所有缺点,可以完全取代Vlookup,学完本文后你将可以应对所有的查找难题,内容...

批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数

批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数在电商运营、物流对账等工作中,经常需要统计快递“揽收到签收”的耗时——比如判断某快递公司是否符合“3天内送达”的服务承...

Excel函数公式教程(490个实例详解)

Excel函数公式教程(490个实例详解)管理层的财务人员为什么那么厉害?就是因为他们精通excel技能!财务人员在日常工作中,经常会用到Excel财务函数公式,比如财务报表分析、工资核算、库存管理等...

Excel(WPS表格)Tocol函数应用技巧案例解读,建议收藏备用!

工作中,经常需要从多个单元格区域中提取唯一值,如体育赛事报名信息中提取唯一的参赛者信息等,此时如果复制粘贴然后去重,效率就会很低。如果能合理利用Tocol函数,将会极大地提高工作效率。一、功能及语法结...

Excel中的SCAN函数公式,把计算过程理清,你就会了

Excel新版本里面,除了出现非常好用的xlookup,Filter公式之外,还更新一批自定义函数,可以像写代码一样写公式其中SCAN函数公式,也非常强大,它是一个循环函数,今天来了解这个函数公式的计...

Excel(WPS表格)中多列去重就用Tocol+Unique组合函数,简单高效

在数据的分析和处理中,“去重”一直是绕不开的话题,如果单列去重,可以使用Unique函数完成,如果多列去重,如下图:从数据信息中可以看到,每位参赛者参加了多项运动,如果想知道去重后的参赛者有多少人,该...

Excel(WPS表格)函数Groupby,聚合统计,快速提高效率!

在前期的内容中,我们讲了很多的统计函数,如Sum系列、Average系列、Count系列、Rank系列等等……但如果用一个函数实现类似数据透视表的功能,就必须用Groupby函数,按指定字段进行聚合汇...

Excel新版本,IFS函数公式,太强大了!

我们举一个工作实例,现在需要计算业务员的奖励数据,右边是公司的奖励标准:在新版本的函数公式出来之前,我们需要使用IF函数公式来解决1、IF函数公式IF函数公式由三个参数组成,IF(判断条件,对的时候返...

Excel不用函数公式数据透视表,1秒完成多列项目汇总统计

如何将这里的多组数据进行汇总统计?每组数据当中一列是不同菜品,另一列就是该菜品的销售数量。如何进行汇总统计得到所有的菜品销售数量的求和、技术、平均、最大、最小值等数据?不用函数公式和数据透视表,一秒就...