Java Future与CompletableFuture
本文最后更新于:2024年3月18日 凌晨
Java Future与CompletableFuture
Future
Future<V>
接口表示一个未来可能会返回的结果,它定义的方法有:
get()
:获取结果(可能会等待)
get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间。
cancel(boolean mayInterruptIfRunning)
:取消当前任务。
isDone()
:判断任务是否已完成。
- 当我们提交一个
Callable
任务后,我们会同时获得一个Future
对象,然后,我们在主线程某个时刻调用Future
对象的get()
方法,就可以获得异步执行的结果,在调用get()
时,如果异步任务已经完成,我们就直接获得结果,如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class Task implements Callable<String> { public String call() throws Exception { return "Hello"(); }
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(4); Callable<String> task = new Task(); Future<String> future = executor.submit(task); String result = future.get(); } }
|
CompletableFuture
- 使用
Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待,从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture
可以指定异步处理流程:
thenAccept()
处理正常结果。
exceptional()
处理异常结果。
thenApplyAsync()
用于串行化另一个CompletableFuture
anyOf()
和allOf()
用于并行化多个CompletableFuture
- 我们以获取股票价格为例,看看如何使用
CompletableFuture
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Main { public static void main(String[] args) throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice); cf.thenAccept((result) -> { System.out.println("price: " + result); }); cf.exceptionally((e) -> { e.printStackTrace(); return null; }); Thread.sleep(200); }
static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } }
|
- 创建一个
CompletableFuture
是通过CompletableFuture.supplyAsync()
实现的,它需要一个实现了Supplier
接口的对象:
1 2 3
| public interface Supplier<T> { T get(); }
|
- 这里我们用lambda语法简化了一下,直接传入
Main::fetchPrice
,因为Main.fetchPrice()
静态方法的签名符合Supplier
接口的定义(除了方法名外)
- 紧接着,
CompletableFuture
已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture
完成时和异常时需要回调的实例,完成时,CompletableFuture
会调用Consumer
对象:
1 2 3
| public interface Consumer<T> { void accept(T t); }
|
- 异常时,
CompletableFuture
会调用Function
对象:
1 2 3
| public interface Function<T, R> { R apply(T t); }
|
- 可见
CompletableFuture
的优点是:
- 异步任务结束时,会自动回调某个对象的方法。
- 异步任务出错时,会自动回调某个对象的方法。
- 主线程设置好回调后,不再关心异步任务的执行。
- 如果只是实现了异步回调机制,我们还看不出
CompletableFuture
相比Future
的优势,CompletableFuture
更强大的功能是,多个CompletableFuture
可以串行执行,例如,定义两个CompletableFuture
,第一个CompletableFuture
根据证券名称查询证券代码,第二个CompletableFuture
根据证券代码查询证券价格,这两个CompletableFuture
实现串行操作如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Main { public static void main(String[] args) throws Exception { CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油"); }); CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> { return fetchPrice(code); }); cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); Thread.sleep(2000); }
static String queryCode(String name) { try { Thread.sleep(100); } catch (InterruptedException e) { } return "601857"; }
static Double fetchPrice(String code) { try { Thread.sleep(100); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
|
- 除了串行执行外,多个
CompletableFuture
还可以并行执行,例如,我们考虑这样的场景:
- 同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class Main { public static void main(String[] args) throws Exception { CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://finance.sina.com.cn/code/"); }); CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://money.163.com/code/"); });
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://finance.sina.com.cn/price/"); }); CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://money.163.com/price/"); });
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); Thread.sleep(200); }
static String queryCode(String name, String url) { System.out.println("query code from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return "601857"; }
static Double fetchPrice(String code, String url) { System.out.println("query price from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
|
- 除了
anyOf()
可以实现"任意个CompletableFuture
只要一个成功”,allOf()
可以实现"所有CompletableFuture
都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
- 最后我们注意
CompletableFuture
的命名规则:
xxx()
:表示该方法将继续在已有的线程中执行。
xxxAsync()
:表示将异步在线程池中执行。