异步编程的几种实现方式

异步编程是实现程序并发、依赖解耦的一种方式。异步的本质是开启一个新的线程来执行。

异步:即整个业务中的某些逻辑交给不同的线程异步处理,不用等待异步线程的结果就可返回。

同步:即一条线程干到底,在没结束没返回之前不能干其它事情,必须在所有逻辑处理完才能返回。

请求线程分离

Servlet 和基于 Servlet 的容器(例如Tomcat)默认处理请求和处理业务是同一条线程,即有请求进来后,分配一条线程接收并处理请求,并继续执行业务处理,直到所有处理结束才完成响应,线程才会释放。这使得 Servlet 对业务方法的调用变成一种阻塞调用,效率较低。

Servlet容器基本上都有线程池来维护请求线程,会有最大线程数限制。例如,Spring Boot默认嵌入的 Tomcat 默认最大线程数是 200。

业务处理会有复杂业务和简单业务,当并发量高时可能会因为复杂业务长时间占用线程而耗完容器线程池中的所有线程,继而导致有简单业务的请求进来也无法快速处理,影响系统性能,产生性能瓶颈。

需要将请求线程业务处理线程分离。步骤大概如下:

  1. 请求线程接收请求完成后,交给异步线程处理业务。
  2. 异步线程处理业务完成后,回调容器的线程完成响应。

请求线程与业务处理线程分离,请求线程接收完请求后就交给业务线程处理业务,请求线程不会阻塞可以继续接收其它请求,业务线程处理即使耗时长些也不影响请求的接收。

假如正在处理业务的线程数量超过请求线程池的数量,这时进来一个简单的请求仍可以快速响应。

通常处理业务的线程也会由线程池来维护,业务处理线程池会远远大于接收请求的线程池。

Servlet 3.0

线程分离实现方式

Servlet3.0 AsyncContext

Servlet 3.0 引入了异步处理,在Servlet 3.1中又引入了非阻塞 IO 来进一步增强异步处理的性能。

HttpServletRequest 对象提供了一个AsyncContext对象,该对象构成了异步处理的上下文。可以从 AsyncContext对象中获取 Request 和 Response 对象。

可以将 AsyncContext 从当前线程传给异步线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。如此,通过将请求从一个线程传给另一个线程处理的过程便构成了Servlet 3.0中的异步处理。

实现示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@RestController
@RequestMapping("/asyncServlet")
public class AsyncServletController {
/**
* @param servletRequest 请求
* @desc 异步测试
*/
@GetMapping("/asyncContext2")
public void asyncContext2(HttpServletRequest servletRequest) {
System.out.println("接收请求线程开始:" + Thread.currentThread().getName());
AsyncContext asyncContext = servletRequest.startAsync();
new Thread(() -> {
System.out.println("异步线程启动:" + Thread.currentThread().getName());
PrintWriter writer = null;
try {
// Thread.sleep(8000);
ServletResponse response = asyncContext.getResponse();
response.setContentType("application/json;charset=UTF-8");
writer = response.getWriter();
HashMap<String, Object> map = new HashMap<>();
map.put("id", 100L);
map.put("message", "Hello World!");
writer.println(JSON.toJsonString(map));
asyncContext.complete();
System.out.println("异步线程结束");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
System.out.println("接收请求线程结束");
}

/**
* @param request 请求
* @desc 异步测试
*/
@RequestMapping(value = "/asyncContext1", method = RequestMethod.GET)
public void asyncContext1(HttpServletRequest request) {
System.out.println("接收请求线程开始:" + Thread.currentThread().getName());
AsyncContext asyncContext = request.startAsync();
new Thread(new Work(asyncContext)).start();
System.out.println("接收请求线程结束");
}
}

/**
* @desc 任务
*/
public class Work implements Runnable {

private AsyncContext asyncContext;

public Work(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}

@Override
public void run() {
System.out.println("异步线程启动:" + Thread.currentThread().getName());
try {
ServletResponse response = asyncContext.getResponse();
response.setContentType("application/json;charset=UTF-8");
PrintWriter writer = response.getWriter();
HashMap<String, Object> map = new HashMap<>();
map.put("id", 100L);
map.put("message","Hello World!");
writer.println(JSON.toJsonString(map));
} catch (IOException e) {
e.printStackTrace();
}
asyncContext.complete();
System.out.println("异步线程结束");
}
}

Spring Boot Flux Web

https://geek-docs.com/spring-boot/spring-boot-tutorials/webflux.html

线程异步处理方式

Thread

通过手动创建线程的方式实现,可以继承 Thread 类 ,或实现 Runnable 接口,或实现 Callable 接口。

Thread 类也是继承 Runnable 接口,Thread 和 Runnable 没有返回值。Callable 可以有返回值。

继承 Thread

继承 Thread 类,重写 run()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CreateThreadTest {
public static void main(String[] args) {
System.out.println("主线程启动: " + Thread.currentThread().getName());
MessageThread thread = new MessageThread();
thread.start();
System.out.println("主线程结束: " + Thread.currentThread().getName());
}
}

class MessageThread extends Thread {
@Override
public void run() {
System.out.println("异步线程启动: " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("异步线程结束: " + Thread.currentThread().getName());
}
}

结果如下:

1
2
3
4
主线程启动: main
主线程结束: main
异步线程启动: Thread-0
异步线程结束: Thread-0

示例中的异步线程休眠了 3 秒,从执行结果可以看到主线程结束后异步线程才结束。

实现 Runnable

直接 new 创建,因 run 方法是个函数接口,支持传入匿名方法,匿名方法里也是重写 run() 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ImplementRunnable {

public static void main(String[] args) {
System.out.println("主线程启动: " + Thread.currentThread().getName());
Thread thread = new Thread(new MailThread());
thread.start();
System.out.println("异步线程结束");

}
}

class MailThread implements Runnable{

@Override
public void run() {
System.out.println("异步程启动: " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("异步线程结束: " + Thread.currentThread().getName());
}
}

run()方法是个函数式接口,可直接给 Thread 对象传入匿名方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
System.out.println("主线程启动: " + Thread.currentThread().getName());
// 匿名方法
new Thread(() -> {
System.out.println("异步程启动: " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("异步线程结束");
}).start();
System.out.println("主线程结束");
}

实现 Callable

实现 Callable 接口,重写 call() 方法,实现类交给 FutureTask 执行。Callable 可以返回结果或抛出异常,Future 拿到异步计算结果并提供了取消任务、判断任务中断、判断任务是否已取消方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ImplementCallable {

public static void main(String[] args) throws Exception {
System.out.println("主线程启动: " + Thread.currentThread().getName());
ReportThread reportThread = new ReportThread();
FutureTask<String> futureTask = new FutureTask<>(reportThread);
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("主线程拿到异步线程结果:" + futureTask.get());
System.out.println("主线程结束");
}
}


class ReportThread implements Callable<String> {

@Override
public String call() throws Exception {
System.out.println("异步程启动: " + Thread.currentThread().getName());
Thread.sleep(3000);
return "Result Data";
}
}

响应结果:

1
2
3
4
主线程启动: main
异步程启动: Thread-0
主线程拿到异步线程结果:Result Data
主线程结束

JDK 提供的线程池 ThreadPoolExecutor 类提交 Callable 任务也是使用了 FutureTask 进行包装。

1
2
3
4
5
6
7
8
9
10
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

ThreadPoolExecutor

还可以通过线程池创建线程,线程池可以维护多个线程,避免在需要处理任务时创建和销毁线程的代价。

异步执行任务不需要创建多个线程,一般使用线程池创建单线程就可以了。

JDK 提供了快捷创建线程池的工具类 Executors,核心是使用 ThreadPoolExecutor 创建线程池。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

ExecutorService 提交任务的 submit 方法入参是 Callable 或 Runnable 类型的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 提交一个指定返回值类型的任务,并返回一个待处理结果的 Future.
* Future 的 get 方法将在任务成功后返回任务的结果
* 如果想阻塞任务直到获取处理结果,可以使用 result = exec.submit(aCallable).get();
*/
<T> Future<T> submit(Callable<T> task);

/**
* 提交 Runnable 任务以供执行,并返回表示该任务的 Future。
* Future 的 get 方法将在成功完成后返回给定的结果.
* task – 提交的任务
* result – 返回的结果,直接在任务执行期间改变对象
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交 Runnable 任务以供执行,并返回代表该任务的 Future。
* Future 的 get 方法将在成功完成后返回 null
*/
Future<?> submit(Runnable task);

示例一:<T> Future<T> submit(Callable<T> task)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
try {
System.out.println("主线程启动: " + Thread.currentThread().getName());
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() ->{
System.out.println("异步线程启动: " + Thread.currentThread().getName());
Thread.sleep(3000);
System.out.println("异步线程结束,返回结果");
return "hello";
});
// 阻塞等待子线程执行完毕,获取指定类型的结果
String result = future.get();
System.out.println("主线程拿到异步线程的结果: " + result);
System.out.println("主线程结束。");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

执行结果:

1
2
3
4
5
主线程启动: main
异步张程启动: pool-1-thread-1
异步线程结束,返回结果
主线程拿到异步线程的结果: hello
主线程结束。

示例二:<T> Future<T> submit(Runnable task, T result)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {

try {
System.out.println("主线程启动: " + Thread.currentThread().getName());
ExecutorService executorService = Executors.newSingleThreadExecutor();

Message message = new Message();
Runnable runnable = () -> {
System.out.println("异步线程启动: " + Thread.currentThread().getName());
message.setMsg("Hello");
message.setResult("success");
System.out.println("异步线程结束");
};
// 支持在任务期间改变传入的对象
Future<Message> future = executorService.submit(runnable, message);
// 等待子线程执行完毕
Message result = future.get();
System.out.println("主线程拿到异步线程的结果: " + result);
System.out.println("主线程结束。");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

}

执行结果:

1
2
3
4
5
主线程启动: main
异步线程启动: pool-1-thread-1
异步线程结束
主线程拿到异步线程的结果: Message(msg=Hello, result=success)
主线程结束。

http://www.javaxks.com/?p=10396

FutureTask

CompletableFuture

https://mp.weixin.qq.com/s/AcxPO-8Y8o1-grYxoOjp1g

https://blog.csdn.net/qq_31086797/article/details/108455482

https://blog.csdn.net/xx666zz/article/details/84944045

https://cloud.tencent.com/developer/article/1506341

https://wenku.baidu.com/view/ea8823eda900b52acfc789eb172ded630b1c9823.html

Spring ApplicationEvent

消息队列

Java开发中常用的异步请求都在这里

https://mp.weixin.qq.com/s/Y4601qycxIo268cTJ6ZKNQ

JAVA语言异步非阻塞设计模式

https://mp.weixin.qq.com/s/mULsyv8PqPgycqgB7HJ1Og

异步编程

在Java中使用异步编程

https://mp.weixin.qq.com/s/xj0_Sxn9kiKpjyFuut8tOw

Java 异步编程:Loom 项目介绍

https://mp.weixin.qq.com/s/dh1LGID4H89_uvfKK9lvSQ

说说Java异步调用的几种方式

https://mp.weixin.qq.com/s/G1OfN60jPA2RVZsJwsXifw

https://mp.weixin.qq.com/s/4wNhbjPzI_vipmS1_GWaRw

https://mp.weixin.qq.com/s/2lHGjjiWlGqOPZ3VXWmLXg

https://mp.weixin.qq.com/s/FZIt56aEMMfTe3U5LOzalQ

https://mp.weixin.qq.com/s/M3jPWobewicIll2fThF9HQ

https://mp.weixin.qq.com/s/16m4aWg68ceF_vZLxwaxuA

https://mp.weixin.qq.com/s/OXPDYQmc_3EtptymAfVrhQ

https://mp.weixin.qq.com/s/q6BfOINeqgm5nffrHu4kQA

https://mp.weixin.qq.com/s/TvHY2i1FX1zS_WHdCvK-wA

https://mp.weixin.qq.com/s/W4nzjUzJCCxYcDo8ebRzCw

https://mp.weixin.qq.com/s/kh-4pMcjoAHgqE6oUEbyUQ

https://mp.weixin.qq.com/s/AVMN8jyqMXWhE9bRs_e0aQ

https://mp.weixin.qq.com/s/dgojwb0z5OMCL99QGh9k5g

https://mp.weixin.qq.com/s/OEjdxCCXnlpRqFEOlHQY8Q

https://mp.weixin.qq.com/s/iuBKdyi_oah_ki21vWHCjw

https://mp.weixin.qq.com/s/ioRdR5YDn2Ej5sYJsDXOHw

https://mp.weixin.qq.com/s/I4WqFb0rm44HbzmYqWRBUg

https://mp.weixin.qq.com/s/glRhnMF9-3IXq-28vngmTQ

https://mp.weixin.qq.com/s/OTRJUXKfcjMg2YRf4W94pg

https://mp.weixin.qq.com/s/qwKyF2eM34cPXrLmvETOIA

https://mp.weixin.qq.com/s/56zsehP7OPu4z4YBfAOzQw

https://blog.csdn.net/qq_37338761/article/details/89251663

https://blog.csdn.net/qq_844085127/article/details/108872756

https://baijiahao.baidu.com/s?id=1666820818587296272&wfr=spider&for=pc

相关参考

  1. JAVA的Callable多线程
作者

光星

发布于

2022-07-30

更新于

2022-11-04

许可协议

评论