CompletableFuture是jdk8推出的新特性。其出现目的是为了解决Java在异步编程、响应式编程的缺失,对标的Java框架是Reactive Streams之类的Spring Reactor、RxJava、Akka Streams等等。
异步编程和响应式编程
在计算机中,我们总是希望一个任务(请求)能够尽可能早的执行完成,于此同时我们还希望在同等配置机器下,能够执行足够多的任务(请求)。 因为计算机的三层模型下,请求的耗时基本都是在IO上,因为我们不希望线程在IO那里阻塞,希望在等待IO结束时,线程还可以接着去做其他的事情。这就是异步编程。 而在使用异步编程时,我们发现有些时候我们需要在异步结果出来之后再去执行一些其他任务。比如:数据处理或者再次发送异步请求。
JavaScript的异步处理
CallBack
JavaScript
早期是使用CallBack
来解决异步结果的处理逻辑。这种方式的缺点有:
(1)缺少规范
早期每个框架对CallBack
的结果五花八门。比如在参数位置、异常的表述方式上,都各自使用自己的。
(2)CallBack Hell
这是CallBack
最著名的缺陷,著名到有一个专门的网站和表情包。
http://callbackhell.com/
(3)不好处理异常情况
比如说:异步接口超时后,如何去处理这个错误?如何在发生错误后,给一个默认值接着处理?如何去限制整个流程的耗时情况?这些都是CallBack
模式很难去做到的。
Java
在jdk1.8
之前,业界基本都是使用Spring Reactor
、RxJava
、Akka Streams
等等来处理的。其思想跟JavaScript
的Promise
很类似,都属于Reactive Program
的一种实现。
而在jdk1.8
里,Java
就引入CompletableFuture
来作为jdk
内部对Reactive Program
的一种实现。你甚至可以认为CompletableFuture
就是Java
版本的Promise
。也是为了弥补Future在响应式编程的短板。比如说:
1 2 3 4 5 6 7 8 9
| Future<?> future = call();
...
Object result = future.get();
|
CompletableFuture
以pipline看CompletableFuture的处理流程
在CompletableFuture构建pipline时,要注意链式操作是挂载在哪个CompletableFuture对象上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class Sample {
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = new CompletableFuture<>();
future.thenApply(data -> data * 2) .exceptionally(ex -> handleExceptionAsData(ex)) .thenApply(data -> data + 1) .thenAccept(data -> System.out.println(data)); System.out.println("built the pipeline");
future.thenAccept(data -> System.out.println(data));
if (Math.random() > 0.75) { future.complete(2); } else { future.completeExceptionally(new RuntimeException("something went wrong")); }
Thread.sleep(1000); }
private static int handleExceptionAsData(Throwable ex) { System.out.println("DATA: " + ex); return -1; } }
|
哪个线程在执行代码
在CompletableFuture中,具体哪段代码是由哪些线程来执行是不确定的。通程情况下,一些异步操作代码是由JVM内部commonPool来执行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class Sample {
public static int compute() { System.out.println("compute: " + Thread.currentThread()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }
public static CompletableFuture<Integer> create() { return CompletableFuture.supplyAsync(Sample::compute); }
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("In main" + Thread.currentThread()); CompletableFuture<Integer> future = create(); Thread.sleep(2000); future.thenAccept(data -> printIt(data));
System.out.println("In main" + Thread.currentThread()); CompletableFuture<Integer> future2 = create(); Thread.sleep(500); future2.thenAccept(data -> printIt(data));
Thread.sleep(2000); System.out.println("end"); }
private static void printIt(Integer data) { System.out.println(data + "--" + Thread.currentThread()); } }
|
自定义线程池
JVM内部的commonPool默认是当前机器CPU核数 - 1。虽然我们可以通过参数来调整该线程池的线程数,但是这个线程池在其他地方也被使用,所以并不建议去调整它。如果某些任务需要线程池中的线程更多的话,可以使用自定义的线程池。
1 2 3 4
| public static CompletableFuture<Integer> create() { ForkJoinPool pool = new ForkJoinPool(10); return CompletableFuture.supplyAsync(Sample::compute, pool); }
|
常见方法
处理异常
- pipline某步出现异常后,它会在pipline找到最近的异常处理函数进行处理。
- 异常可以转换成正常数据接着走正常数据;也可以接着走异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class Sample {
public static int compute() { throw new RuntimeException("something went wrong");
}
public static CompletableFuture<Integer> create() { return CompletableFuture.supplyAsync(Sample::compute); }
public static void main(String[] args) throws ExecutionException, InterruptedException { create().thenApply(data -> data * 2) .exceptionally(ex -> handleExceptionAsData(ex)) .thenAccept(data -> System.out.println(data)) .exceptionally(ex -> handleExceptionAsError(ex)); }
private static Void handleExceptionAsError(Throwable ex) { System.out.println("ERROR: " + ex); throw new RuntimeException("it is beyond all hope"); } private static int handleExceptionAsData(Throwable ex) { System.out.println("DATA: " + ex); return -1; } }
|
超时处理
Java8中只有通过get()来处理,Java9新增方法处理了这个问题:
completeOnTimeout
orTimeout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;
public class Sample {
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = new CompletableFuture<>();
future.thenApply(data -> data * 2) .exceptionally(ex -> handleExceptionAsData(ex)) .thenApply(data -> data + 1) .thenAccept(data -> System.out.println(data)); System.out.println("built the pipeline");
future.orTimeout(2, TimeUnit.SECONDS);
Thread.sleep(3000); }
private static int handleExceptionAsData(Throwable ex) { System.out.println("DATA: " + ex); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return -1; } }
|
get vs getNow
get
属于阻塞方法
getNow
是非阻塞方法,当未完成时,会使用默认值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class Sample {
public static int compute() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }
public static CompletableFuture<Integer> create() { return CompletableFuture.supplyAsync(Sample::compute); }
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("start"); System.out.println(create().getNow(0));
System.out.println("second"); System.out.println(create().get()); System.out.println("end"); } }
|
Combine
Combine方法可以对俩个Completable的结果进行处理。比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;
public class Sample {
public static CompletableFuture<Integer> create(int number) { return CompletableFuture.supplyAsync(() -> number);
}
public static void main(String[] args) throws ExecutionException, InterruptedException { create(2).thenCombine(create(3), (result1, result2) -> result1 + result2) .thenAccept(System.out::println); Thread.sleep(1000); } }
|
Compose
compose是指对Completable<Completable>进行处理,相当于CallBack Hell的处理。比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;
public class Sample {
public static CompletableFuture<Integer> create(int number) { return CompletableFuture.supplyAsync(() -> number); }
public static CompletableFuture<Integer> inc(int number) { return CompletableFuture.supplyAsync(() -> number + 1); }
public static void main(String[] args) throws ExecutionException, InterruptedException { create(2) .thenApply(data -> inc(data)) .thenAccept(result -> System.out.println(result)); Thread.sleep(1000);
create(2) .thenCompose(data -> inc(data)) .thenAccept(result -> System.out.println(result));
Thread.sleep(1000); }
}
|
缺点
- CompletableFuture无法响应中断操作;
- 由于不确定哪些代码是由哪些线程执行的,所以ThreadLocal使用起来会比较麻烦;最好还是使用上下文对象