wangjie_fourth

may the force be with you

0%

CompletableFuture

CompletableFuture是jdk8推出的新特性。其出现目的是为了解决Java在异步编程、响应式编程的缺失,对标的Java框架是Reactive Streams之类的Spring Reactor、RxJava、Akka Streams等等。

异步编程和响应式编程

在计算机中,我们总是希望一个任务(请求)能够尽可能早的执行完成,于此同时我们还希望在同等配置机器下,能够执行足够多的任务(请求)。 因为计算机的三层模型下,请求的耗时基本都是在IO上,因为我们不希望线程在IO那里阻塞,希望在等待IO结束时,线程还可以接着去做其他的事情。这就是异步编程。 而在使用异步编程时,我们发现有些时候我们需要在异步结果出来之后再去执行一些其他任务。比如:数据处理或者再次发送异步请求。

JavaScript的异步处理

CallBack

JavaScript早期是使用CallBack来解决异步结果的处理逻辑。这种方式的缺点有:
(1)缺少规范
早期每个框架对CallBack的结果五花八门。比如在参数位置、异常的表述方式上,都各自使用自己的。
(2)CallBack Hell
这是CallBack最著名的缺陷,著名到有一个专门的网站和表情包。

http://callbackhell.com/
(3)不好处理异常情况
比如说:异步接口超时后,如何去处理这个错误?如何在发生错误后,给一个默认值接着处理?如何去限制整个流程的耗时情况?这些都是CallBack模式很难去做到的。

Java

jdk1.8之前,业界基本都是使用Spring ReactorRxJavaAkka Streams等等来处理的。其思想跟JavaScriptPromise很类似,都属于Reactive Program的一种实现。
而在jdk1.8里,Java就引入CompletableFuture来作为jdk内部对Reactive Program的一种实现。你甚至可以认为CompletableFuture就是Java版本的Promise。也是为了弥补Future在响应式编程的短板。比如说:

1
2
3
4
5
6
7
8
9
Future<?> future = call();

// do other thing
...

// it will block
Object result = future.get();
// 对result进行处理,或者再次调用其他的Future;这部分处理仍然要强制交给主线程来处理
// 在理想的情况下,应该是主线程或者其他线程,谁有空谁来执行

CompletableFuture

以pipline看CompletableFuture的处理流程

在CompletableFuture构建pipline时,要注意链式操作是挂载在哪个CompletableFuture对象上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @author jwang55
*/
public class Sample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = new CompletableFuture<>();

// 构建pipline
future.thenApply(data -> data * 2)
.exceptionally(ex -> handleExceptionAsData(ex))
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data)); // 5
System.out.println("built the pipeline");

// 注意对哪个CompletableFuture做后续处理。这里的值是:2
future.thenAccept(data -> System.out.println(data));

// 不能俩个都complete
if (Math.random() > 0.75) {
// 传递正常数据
future.complete(2);
} else {
// 传递异常数据
future.completeExceptionally(new RuntimeException("something went wrong")); // 0
}

Thread.sleep(1000);
}

private static int handleExceptionAsData(Throwable ex) {
System.out.println("DATA: " + ex);
// 将异常抓换成正常数据
return -1;
}
}

哪个线程在执行代码

在CompletableFuture中,具体哪段代码是由哪些线程来执行是不确定的。通程情况下,一些异步操作代码是由JVM内部commonPool来执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @author jwang55
*/
public class Sample {

public static int compute() {
System.out.println("compute: " + Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}

public static CompletableFuture<Integer> create() {
return CompletableFuture.supplyAsync(Sample::compute);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("In main" + Thread.currentThread());
CompletableFuture<Integer> future = create();
Thread.sleep(2000);
// 主线程执行;因为主线程已经空闲了,并且future前部分也已经执行完了,没有必要交给其他线程执行
future.thenAccept(data -> printIt(data));

System.out.println("In main" + Thread.currentThread());
CompletableFuture<Integer> future2 = create();
Thread.sleep(500);
// 其它线程执行;因为主线程已经空闲了,但future前部分还没执行完,主线程没必要在这里阻塞,将这部分任务交给其他线程
future2.thenAccept(data -> printIt(data));

Thread.sleep(2000);
System.out.println("end");
}

private static void printIt(Integer data) {
System.out.println(data + "--" + Thread.currentThread());
}
}

自定义线程池

JVM内部的commonPool默认是当前机器CPU核数 - 1。虽然我们可以通过参数来调整该线程池的线程数,但是这个线程池在其他地方也被使用,所以并不建议去调整它。如果某些任务需要线程池中的线程更多的话,可以使用自定义的线程池。

1
2
3
4
public static CompletableFuture<Integer> create() {
ForkJoinPool pool = new ForkJoinPool(10);
return CompletableFuture.supplyAsync(Sample::compute, pool);
}

常见方法

处理异常

  • pipline某步出现异常后,它会在pipline找到最近的异常处理函数进行处理。
  • 异常可以转换成正常数据接着走正常数据;也可以接着走异常处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @author jwang55
*/
public class Sample {

public static int compute() {
throw new RuntimeException("something went wrong");
// return 2;
}

public static CompletableFuture<Integer> create() {
return CompletableFuture.supplyAsync(Sample::compute);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
create().thenApply(data -> data * 2)
.exceptionally(ex -> handleExceptionAsData(ex)) // 将异常抓换成正常数据
.thenAccept(data -> System.out.println(data))
.exceptionally(ex -> handleExceptionAsError(ex)); // 将异常当作异常继续传递
}

private static Void handleExceptionAsError(Throwable ex) {
System.out.println("ERROR: " + ex);
// 将异常当作异常继续传递
throw new RuntimeException("it is beyond all hope");
}
private static int handleExceptionAsData(Throwable ex) {
System.out.println("DATA: " + ex);
// 将异常抓换成正常数据
return -1;
}
}

超时处理

Java8中只有通过get()来处理,Java9新增方法处理了这个问题:

  • completeOnTimeout
  • orTimeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* @author jwang55
*/
public class Sample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = new CompletableFuture<>();

// note: 这里只能一次性创建好pipline,否则后续只会以最后一次创建的pipline为准
future.thenApply(data -> data * 2)
.exceptionally(ex -> handleExceptionAsData(ex))
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data));
System.out.println("built the pipeline");

// 整个pipeline执行耗时超过2秒后,将进入异常channel
future.orTimeout(2, TimeUnit.SECONDS);
// future.completeOnTimeout(0, 2, TimeUnit.SECONDS);

Thread.sleep(3000);
}

private static int handleExceptionAsData(Throwable ex) {
System.out.println("DATA: " + ex);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 将异常抓换成正常数据
return -1;
}
}

get vs getNow

  • get属于阻塞方法
  • getNow是非阻塞方法,当未完成时,会使用默认值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @author jwang55
*/
public class Sample {

public static int compute() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}

public static CompletableFuture<Integer> create() {
return CompletableFuture.supplyAsync(Sample::compute);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start");
System.out.println(create().getNow(0)); // non blocking, 有值就给,没有就给默认值

System.out.println("second");
System.out.println(create().get()); // blocking
System.out.println("end");
}
}

Combine

Combine方法可以对俩个Completable的结果进行处理。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* @author jwang55
*/
public class Sample {

public static CompletableFuture<Integer> create(int number) {
return CompletableFuture.supplyAsync(() -> number);

}

public static void main(String[] args) throws ExecutionException, InterruptedException {
create(2).thenCombine(create(3), (result1, result2) -> result1 + result2)
.thenAccept(System.out::println);
Thread.sleep(1000);
}
}

Compose

compose是指对Completable<Completable>进行处理,相当于CallBack Hell的处理。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* @author jwang55
*/
public class Sample {

public static CompletableFuture<Integer> create(int number) {
return CompletableFuture.supplyAsync(() -> number);
}

public static CompletableFuture<Integer> inc(int number) {
return CompletableFuture.supplyAsync(() -> number + 1);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
create(2)
.thenApply(data -> inc(data))
.thenAccept(result -> System.out.println(result));
Thread.sleep(1000);

create(2)
.thenCompose(data -> inc(data))
.thenAccept(result -> System.out.println(result));

Thread.sleep(1000);
}

}

缺点

  • CompletableFuture无法响应中断操作;
  • 由于不确定哪些代码是由哪些线程执行的,所以ThreadLocal使用起来会比较麻烦;最好还是使用上下文对象