Java Future与CompletableFuture

本文最后更新于:2024年3月18日 凌晨

Java Future与CompletableFuture

Future

  • Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:
    • get():获取结果(可能会等待)
    • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间。
    • cancel(boolean mayInterruptIfRunning):取消当前任务。
    • isDone():判断任务是否已完成。
  • 当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果,在调用get()时,如果异步任务已经完成,我们就直接获得结果,如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Task implements Callable<String> {
public String call() throws Exception {
return "Hello"();
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞。
}
}

CompletableFuture

  • 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待,从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
  • CompletableFuture可以指定异步处理流程:
    • thenAccept()处理正常结果。
    • exceptional()处理异常结果。
    • thenApplyAsync()用于串行化另一个CompletableFuture
    • anyOf()allOf()用于并行化多个CompletableFuture
  • 我们以获取股票价格为例,看看如何使用CompletableFuture:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Main {
public static void main(String[] args) throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
// 如果执行成功:
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 如果执行异常:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}

static Double fetchPrice() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}
  • 创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:
1
2
3
public interface Supplier<T> {
T get();
}
  • 这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)
  • 紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例,完成时,CompletableFuture会调用Consumer对象:
1
2
3
public interface Consumer<T> {
void accept(T t);
}
  • 异常时,CompletableFuture会调用Function对象:
1
2
3
public interface Function<T, R> {
R apply(T t);
}
  • 可见CompletableFuture的优点是:
    • 异步任务结束时,会自动回调某个对象的方法。
    • 异步任务出错时,会自动回调某个对象的方法。
    • 主线程设置好回调后,不再关心异步任务的执行。
  • 如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势,CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Main {
public static void main(String[] args) throws Exception {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}

static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return "601857";
}

static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
  • 除了串行执行外,多个CompletableFuture还可以并行执行,例如,我们考虑这样的场景:
  • 同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class Main {
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});

// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});

// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}

static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}

static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
  • 除了anyOf()可以实现"任意个CompletableFuture只要一个成功”,allOf()可以实现"所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
  • 最后我们注意CompletableFuture的命名规则:
    • xxx():表示该方法将继续在已有的线程中执行。
    • xxxAsync():表示将异步在线程池中执行。

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!