pursue wind pursue wind
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
  • 工具类

  • Java-集合框架

  • Java8

    • Java 遍历文件
    • Java8中的流操作-基本使用&性能测试
    • 使用CompletableFuture构建异步应用
      • Future 接口的局限性
      • CompletableFuture
        • 异步执行
        • 错误处理
        • 工厂方法
        • 将两个CompletableFuture建立联系
        • 响应 CompletableFuture 的 completion 事件
    • Collectors.toMap()
    • CompletableFuture
  • Java-多线程

  • Java计时新姿势√
  • Java中的BlockingQueue
  • Lambda表达式被首次调用时很慢?从JIT到类加载再到实现原理
  • 正则表达式
  • Java定时任务
  • JavaWeb

  • Java
  • Java8
pursuewind
2020-11-23
目录

使用CompletableFuture构建异步应用

# 使用CompletableFuture构建异步应用

# Future 接口的局限性

future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第 一个的结果。
  • 等待 Future 集合中的所有任务都完成。
  • 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同 一个值),并返回它的结果。
  • 通过编程方式完成一个 Future 任务的执行(即以手工设定异步操作结果的方式)。
  • 应对 Future 的完成事件(即当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)

新的CompletableFuture将使得这些成为可能。

# CompletableFuture

# 异步执行

首先,CompletableFuture实现了Future接口,因此你可以像Future那样使用它。

其次,CompletableFuture并非一定要交给线程池执行才能实现异步,你可以像下面这样实现异步运行。

public static void test1() throws Exception{
    CompletableFuture<String> completableFuture=new CompletableFuture();
    new Thread(new Runnable() {
        @Override
        public void run() {
            //模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //告诉completableFuture任务已经完成
            completableFuture.complete("result");
        }
    }).start();
    //获取任务结果,如果没有完成会一直阻塞等待
    String result=completableFuture.get();
    System.out.println("计算结果:"+result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 错误处理

如果没有意外,上面发的代码工作得很正常。但是,如果任务执行过程中产生了异常会怎样呢?

非常不幸,这种情况下你会得到一个相当糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该线程,而这会导致等待 get 方法返回结果的线程永久地被阻塞。

客户端可以使用重载版本的 get 方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。

使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了 Timeout-Exception 。不过,也因为如此,你不能指定执行任务的线程内到底发生了什么问题。

为了能获取任务线程内发生的异常,你需要使用 CompletableFuture 的completeExceptionally方法将导致CompletableFuture 内发生问题的异常抛出。这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的Exception 参数。

public static void test2() throws Exception{
    CompletableFuture<String> completableFuture=new CompletableFuture();
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                //模拟执行耗时任务
                System.out.println("task doing...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                throw new RuntimeException("抛异常了");
            }catch (Exception e) {
                //告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
            }
        }
    }).start();
    //获取任务结果,如果没有完成会一直阻塞等待
    String result=completableFuture.get();
    System.out.println("计算结果:"+result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 工厂方法

前面我们通过编程自己创建 CompletableFuture 对象以及如何获取返回值,虽然看起来这些操作已经比较方便,但还有进一步提升的空间.

CompletableFuture 类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。

supplyAsync 方法接受一个生产者(Supplier)作为参数,返回一个 CompletableFuture 对象。生产者方法会交由 ForkJoinPool池中的某个执行线程( Executor )运行,但是你也可以使用 supplyAsync 方法的重载版本,传递第二个参数指定线程池执行器执行生产者方法。

public static void test3() throws Exception {
    //supplyAsync内部使用ForkJoinPool线程池执行任务
    CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
        //模拟执行耗时任务
        System.out.println("task doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return "result";
    });
    System.out.println("计算结果:"+completableFuture.get());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

allOf 工厂方法接收一个由CompletableFuture 构成的数组,数组中的所有 Completable-Future 对象执行完成之后,它返回一个 CompletableFuture 对象。这意味着,如果你需要等待多个 CompletableFuture 对象执行完毕,对 allOf 方法返回的 CompletableFuture 执行 join 操作可以等待CompletableFuture执行完成。

或者你可能希望只要 CompletableFuture 对象数组中有任何一个执行完毕就不再等待,在这种情况下,你可以使用一个类似的工厂方法 anyOf 。

该方法接收一个 CompletableFuture 对象构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的 CompletableFuture 。

public static void test4() throws Exception {
    CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
        //模拟执行耗时任务
        System.out.println("task1 doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return "result1";
    });
    CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{
        //模拟执行耗时任务
        System.out.println("task2 doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return "result2";
    });
    CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2);
    System.out.println("第一个完成的任务结果:"+anyResult.get());
    CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2);
    //阻塞等待所有任务执行完成
    allResult.join();
    System.out.println("所有任务执行完成");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 将两个CompletableFuture建立联系

通常,我们会有多个需要独立运行但又有所依赖的的任务。比如先等用于的订单处理完毕然后才发送邮件通知客户。

thenCompose 方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。你可以创建两个CompletableFutures 对象,对第一个 CompletableFuture 对象调用thenCompose ,并向其传递一个函数。当第一个CompletableFuture 执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个 CompletableFuture 的返回做输入计算出的第二个 CompletableFuture 对象。

public static void test5() throws Exception {

    CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
        //模拟执行耗时任务
        System.out.println("task1 doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return "result1";
    });

    //等第一个任务完成后,将任务结果传给参数result,执行后面的任务并返回一个代表任务的completableFuture
    CompletableFuture<String> completableFuture2= completableFuture1.thenCompose(result->CompletableFuture.supplyAsync(()->{
        //模拟执行耗时任务
        System.out.println("task2 doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return "result2";
    }));

    System.out.println(completableFuture2.get());

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

另一种比较常见的情况是,你需要将两个完 全不相干的 CompletableFuture 对象的结果整合起来,而且你也不希望等到第一个任务完全结 束才开始第二项任务。

这种情况,你应该使用 thenCombine 方法,它接收名为 BiFunction 的第二参数,这个参数 定义了当两个 CompletableFuture 对象完成计算后,结果如何合并。

 public static void test6() throws Exception {

    CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
        //模拟执行耗时任务
        System.out.println("task1 doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return 100;
    });

    //将第一个任务与第二个任务组合一起执行,都执行完成后,将两个任务的结果合并
    CompletableFuture<Integer> completableFuture2 = completableFuture1.thenCombine(
            //第二个任务
            CompletableFuture.supplyAsync(() -> {
                //模拟执行耗时任务
                System.out.println("task2 doing...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //返回结果
                return 2000;
            }),
            //合并函数
            (result1, result2) -> result1 + result2);

    System.out.println(completableFuture2.get());

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 响应 CompletableFuture 的 completion 事件

我们可以在每个CompletableFuture 上注册一个操作,该操作会在 CompletableFuture 完成执行后调用它。CompletableFuture 通过 thenAccept 方法提供了这一功能,它接收 CompletableFuture 执行完毕后的返回值做参数。

 public static void test7() throws Exception {

    CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
        //模拟执行耗时任务
        System.out.println("task1 doing...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回结果
        return 100;
    });
   
    //注册完成事件
    completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result));

    CompletableFuture<Integer> completableFuture2=
            //第二个任务
            CompletableFuture.supplyAsync(() -> {
                //模拟执行耗时任务
                System.out.println("task2 doing...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //返回结果
                return 2000;
            });

    //注册完成事件
    completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result));
    
    //将第一个任务与第二个任务组合一起执行,都执行完成后,将两个任务的结果合并
    CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2,
            //合并函数
            (result1, result2) -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return result1 + result2;
            });

    System.out.println(completableFuture3.get());

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

慢慢消化

Last Updated: 2023/01/30, 11:01:00
Java8中的流操作-基本使用&性能测试
Collectors.toMap()

← Java8中的流操作-基本使用&性能测试 Collectors.toMap()→

Theme by Vdoing | Copyright © 2019-2023 pursue-wind | 粤ICP备2022093130号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
  • 飙升榜
  • 新歌榜
  • 云音乐民谣榜
  • 美国Billboard榜
  • UK排行榜周榜
  • 网络DJ