wangjie_fourth

may the force be with you

0%

线程中数据传递问题

在系统编码中,有时候我们需要在不同的情况下,从当前线程获取某些值,这就需要在特定时候为线程设置值。比如说:

  • 在当前线程设置值
  • 在当前线程新创建的线程设置值
  • 在当前线程所复用线程池中的某个线程设置值

线程在java中就是用Thread类来表示的。其中有俩个属性threadLocalsinheritableThreadLocals是用来存储该线程从其他地方获取,分别是由ThreadLocalInheritableThreadLocal类来维护。这个俩个参数分别表示当前线程为自己设置的值、
从父线程中设置的值。以下是一个Thread类对俩个属性的表示图:

然后再看一看,一个Thread创建过程中跟上下文参数相关的事:

1
2
3
4
5
6
7
8
9
10
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略部分代码
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

// 省略部分代码
}

ThreadLocal

ThreadLocal是表示当前线程本地的变量,通常用于在某个线程的业务生命周期中设置数据、获取数据,比如说:后端系统常用的UserContext

原理

ThreadLocal实现的原理就是通过获取当前线程,然后获取该线程本身的threadLocals变量,后面的操作都是针对这个变量来操作的。
1、get源码

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

2、set源码

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

ThreadLocal的缺点

只能在当前线程生效,如果你在当前线程新创建的一个线程,新的线程就无法获取父线程设置的值。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadLocalDefectDemo {
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static void main(String[] args) {
threadLocal.set("threadLocal做不到的事");

// 子线程
new Thread(() -> {
// 输出为null
System.out.println(threadLocal.get());
}).start();
}
}

拿不到值的原因就是,ThreadLocal是与Thread相绑定的,新创建的线程所绑定的ThreadLocal是获取不到父线程绑定的ThreadLocal,而且在创建新线程时,也不会去复制父线程的threadLocals

InheritableThreadLocal

InheritableThreadLocalThreadLocal的子类,它表示的是会传递给子线程的数据。

原理

InheritableThreadLocal的原理就是让新创建的线程复制父线程的inheritableThreadLocals到新线程的inheritableThreadLocals中,然后覆盖ThreadLocal三个方法,让线程在获取值时,是从当前线程的inheritableThreadLocals获取数据。

InheritableThreadLocal的缺点

InheritableThreadLocal虽然可以可以解决父子线程获取值的问题。但它无法在线程池复用中,获取父线程的数据。原因就是inheritableThreadLocals是仅在创建线程中才去复制数据,而线程池中的线程是已经创建好的,也就没有从父线程复制数据的过程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class InheritableThreadLocalDefectDemo {
private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
inheritableThreadLocal.set("threadLocal做不到的事");

// 子线程
new Thread(() -> {
// 输出为"threadLocal做不到的事"
System.out.println(inheritableThreadLocal.get());
}).start();

// 线程池复用
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("ProcessMessage-pool-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 2000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

Future<?> submit = executorService.submit(() -> {
// 线程池中新创建的线程是可以显示数据
System.out.println(inheritableThreadLocal.get());

// 假设某个线程并没有设置,在后期复用数据时,是不是会显示数据
inheritableThreadLocal.set(null);
});
Object o = submit.get();

executorService.submit(() -> {
// 复用的新线程无法获取数据
System.out.println(inheritableThreadLocal.get());
});
Object o1 = submit.get();

executorService.shutdown();
}
}

TransmittableThreadLocal

TransmittableThreadLocal是阿里巴巴开源出来的第三方库,专门用来解决线程复用的问题。

原理

TransmittableThreadLocal是在线程池提交任务之前将父线程的数据拷贝一份,然后在线程执行前,将这份拷贝数据复制到该线程的inheritableThreadLocals中。
具体流程图如下:

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TransmittableThreadLocalDemo {
private static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
transmittableThreadLocal.set("全都能看见");

// 线程池复用
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("ProcessMessage-pool-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 2000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(executorService);

Future<?> submit = ttlExecutorService.submit(() -> {
// 线程池中新创建的线程是可以显示数据
System.out.println(transmittableThreadLocal.get());

// 假设某个线程并没有设置,在后期复用数据时,是不是会显示数据
transmittableThreadLocal.set(null);
});
Object o = submit.get();

transmittableThreadLocal.set("变一下吧");
ttlExecutorService.submit(() -> {
// 复用的线程获取数据
System.out.println(transmittableThreadLocal.get());
});
Object o1 = submit.get();

ttlExecutorService.shutdown();

}
}
## 对Spring的ThreadPoolTaskExecutor改造

其他问题

数据拷贝问题

InheritableThreadLocalTransmittableThreadLocal在拷贝父线程数据时,默认都是采用浅拷贝的方式。比如说:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class InheritableThreadLocalDefectDataDemo {
private static InheritableThreadLocal<UserInfo> infoInheritableDataThreadLocal = new InheritableThreadLocal<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
UserInfo userInfo = UserInfo.builder()
.normalMessage("normalMessage")
.userDetailInfo(UserDetailInfo.builder().innerMessage("innerMessage").build())
.build();
infoInheritableDataThreadLocal.set(userInfo);

// 线程池复用
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("ProcessMessage-pool-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 2000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

Future<?> submit = executorService.submit(() -> {

System.out.println(infoInheritableDataThreadLocal.get().getNormalMessage());
System.out.println(infoInheritableDataThreadLocal.get().getUserDetailInfo().getInnerMessage());
// 其他线程修改数据会影响到其他线程
infoInheritableDataThreadLocal.get().setUserDetailInfo(null);
});
Object o = submit.get();

// 父线程拿不到数据
System.out.println(infoInheritableDataThreadLocal.get().getUserDetailInfo());

executorService.shutdown();
}
}

如果你希望采用深拷贝来拷贝数据,就得自己写一个类继承InheritableThreadLocal并重写childValue方法。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class InheritableThreadLocalDefectDataDemo {
private static InheritableThreadLocal<UserInfo> infoInheritableDataThreadLocal = new InheritableThreadLocal<>();
private static MyInheritableThreadLocal<UserInfo> myInheritableThreadLocal = new MyInheritableThreadLocal<>();


public static void main(String[] args) throws ExecutionException, InterruptedException {
UserInfo userInfo = UserInfo.builder()
.normalMessage("normalMessage")
.userDetailInfo(UserDetailInfo.builder().innerMessage("innerMessage").build())
.build();
UserInfo userInfo2 = UserInfo.builder()
.normalMessage("normalMessage")
.userDetailInfo(UserDetailInfo.builder().innerMessage("innerMessage").build())
.build();
infoInheritableDataThreadLocal.set(userInfo2);
myInheritableThreadLocal.set(userInfo);

// 线程池复用
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("ProcessMessage-pool-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 2000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

Future<?> submit = executorService.submit(() -> {

System.out.println(infoInheritableDataThreadLocal.get().getNormalMessage());
System.out.println(infoInheritableDataThreadLocal.get().getUserDetailInfo().getInnerMessage());
System.out.println(myInheritableThreadLocal.get().getUserDetailInfo().getInnerMessage());
// 其他线程修改数据会影响到其他线程
infoInheritableDataThreadLocal.get().setUserDetailInfo(null);

myInheritableThreadLocal.get().setUserDetailInfo(null);
});
Object o = submit.get();

// 父线程拿不到数据
System.out.println(infoInheritableDataThreadLocal.get().getUserDetailInfo());

System.out.println(myInheritableThreadLocal.get().getUserDetailInfo());
executorService.shutdown();
}
}

https://github.com/alibaba/transmittable-thread-local
https://mp.weixin.qq.com/s/a6IGrOtn1mi0r05355L5Ng
https://blog.csdn.net/prestigeding/article/details/54945658
文中代码:https://github.com/wangjie-fourth/CodeDemo/tree/main/ttl