本篇文章主要是寻求Java线程中协作和等待相关的API在协程里边的等价物,不过看完之后你会发现,协作和等待这种在线程中比较复杂的操作到了协程里边一点难度都没有。
线程与协程的协作与等待
协程,天生就是并行的,无论是同等级的协程,还是父子协程,亦或是互相之间没有关系的协程,如果我们希望协程之间互相等待,希望在协程执行的过程中可以停住,等待别的协程执行完毕,我们该如何操作?有两个函数,一个是Job的join()
函数:
js 代码解读复制代码val job1 = scope.launch {
println("Job 1 started")
delay(2000)
println("Job 1 done")
}
scope.launch {
println("Job 2 started")
delay(1000)
job1.join()
println("Job 2 done")
}
preJob2需要等preJob1执行完之后才能继续执行,如果使用的是async函数,还可以用返回的deferred对象的await函数,他不仅等待协程,还能拿到协程里返回的结果:
js 代码解读复制代码val deferred = scope.async {
println("Deferred started")
delay(3000)
println("Deferred done")
"rengwuxian"
}
scope.launch {
println("Job 2 started")
delay(1000)
println("Deferred result: ${deferred.await()}")
println("Job 2 done")
}
其实线程对于这种互相等待的情况也有对应的解决方案,比如Thread也有一个join()函数。另外线程还有Future和CompletableFuture,然后传统的Java线程系统里边,还有一个叫CountDownLatch的东西,用于个线程等待多个线程。
Thread:join
Thread
的 join
方法是 Java 中用于让一个线程等待另一个线程完成的方法。使用 join
可以确保一个线程在另一个线程执行完毕后再继续执行。
示例:
假设我们有三个线程,每个线程执行一定的任务,我们希望主线程等待这三个线程都执行完毕后再继续执行。
js 代码解读复制代码public class ThreadJoinExample {
public static void main(String[] args) {
// 创建三个线程
Thread thread1 = new Thread(new Task("Task 1"));
Thread thread2 = new Thread(new Task("Task 2"));
Thread thread3 = new Thread(new Task("Task 3"));
// 启动线程
thread1.start();
thread2.start();
thread3.start();
try {
// 主线程等待 thread1 执行完毕
thread1.join();
System.out.println("Thread 1 has finished.");
// 主线程等待 thread2 执行完毕
thread2.join();
System.out.println("Thread 2 has finished.");
// 主线程等待 thread3 执行完毕
thread3.join();
System.out.println("Thread 3 has finished.");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 所有线程执行完毕,主线程继续执行
System.out.println("All tasks are completed. Main thread resumes.");
}
static class Task implements Runnable {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
try {
// 模拟任务执行
System.out.println(name + " is running...");
Thread.sleep(2000); // 模拟任务耗时
System.out.println(name + " has finished.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 输出:
Task 1 is running...
Task 2 is running...
Task 3 is running...
Task 1 has finished.
Thread 1 has finished.
Task 2 has finished.
Thread 2 has finished.
Task 3 has finished.
Thread 3 has finished.
All tasks are completed. Main thread resumes.
解释:
- thread1.join();
主线程调用join()
方法,等待thread1
线程执行完毕。如果thread1
还没有结束,主线程会阻塞在这里,直到thread1
结束。 - 线程执行顺序:
每个线程在启动后都会执行它的run()
方法。由于主线程调用了join()
,它会等待thread1
完成,然后等待thread2
,最后等待thread3
。只有在所有线程都完成后,主线程才会继续执行。 - 模拟任务执行:
每个线程使用Thread.sleep(2000)
来模拟任务的执行,表示线程在执行任务时会耗时两秒。
Future
Future
的实现机制
Future
接口通常与 ExecutorService
一起使用。任务提交到 ExecutorService
后,它会在一个线程池中异步执行,Future
对象提供了获取执行结果的方法。
主要方法:
get()
方法:这是Future
的核心方法,用于获取任务的结果。它是一个阻塞方法,如果任务没有完成,get()
会阻塞当前线程直到任务完成并返回结果。这是通过内部的同步原语(如wait()
和notify()
)来实现的。
js 代码解读复制代码Future<Integer> future = executorService.submit(() -> {
// 执行任务
return 42;
});
// 当前线程会在此处阻塞,直到任务执行完毕并返回结果
Integer result = future.get();
CompletableFuture
CompletableFuture
是 Future
的扩展,它不仅提供了等待任务结果的功能,还支持更复杂的异步任务组合和处理机制。
实现机制:
- 非阻塞获取结果:与
Future
不同,CompletableFuture
提供了一些非阻塞的机制来处理任务结果。它通过回调函数(例如thenApply
,thenAccept
)在任务完成后自动执行后续操作,而无需显式等待。 - 内部实现:
CompletableFuture
的异步操作是在ForkJoinPool
或其他线程池中执行的。CompletableFuture
通过CompletableFuture
的内部状态 (UNSAFE
、AtomicReferenceFieldUpdater
) 来管理任务的完成状态,并使用回调链在任务完成时自动触发后续操作。
示例代码:
js 代码解读复制代码CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 执行任务
return 42;
});
// 注册回调,在任务完成后处理结果
future.thenApply(result -> result * 2)
.thenAccept(finalResult -> System.out.println("Final result: " + finalResult));
// 无需显式调用 get() 方法,任务完成后回调自动执行
注意事项:
- 线程池配置:当使用
CompletableFuture
或ExecutorService
时,要确保线程池配置合理。如果线程池中的线程数不足,可能会导致任务无法及时执行,从而导致长时间等待甚至死锁。 - 避免死锁:在设计任务依赖时,需避免多个任务之间的循环依赖,否则可能导致线程相互等待而引发死锁。
CountDownLatch
允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。比如我们有一个线程需要等待另外三个线程的结果,可以这么写:
js 代码解读复制代码import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
// 初始化CountDownLatch,计数为3
CountDownLatch latch = new CountDownLatch(3);
// 启动三个线程
Thread worker1 = new Thread(new Worker("Worker 1", latch));
Thread worker2 = new Thread(new Worker("Worker 2", latch));
Thread worker3 = new Thread(new Worker("Worker 3", latch));
worker1.start();
worker2.start();
worker3.start();
try {
// 主线程等待所有工作线程完成
System.out.println("Main thread waiting for workers to finish...");
latch.await(); // 等待计数归零
System.out.println("All workers are done. Main thread resumes.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Worker implements Runnable {
private final String name;
private final CountDownLatch latch;
public Worker(String name, CountDownLatch latch) {
this.name = name;
this.latch = latch;
}
@Override
public void run() {
try {
// 模拟任务执行
System.out.println(name + " is working...");
Thread.sleep(2000); // 模拟工作任务耗时
System.out.println(name + " has finished its work.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 完成工作,计数减一
latch.countDown();
}
}
}
}
// 输出结果:
Main thread waiting for workers to finish...
Worker 1 is working...
Worker 2 is working...
Worker 3 is working...
Worker 1 has finished its work.
Worker 2 has finished its work.
Worker 3 has finished its work.
All workers are done. Main thread resumes.
解释:
- CountDownLatch latch = new CountDownLatch(3);
初始化一个CountDownLatch
实例,并设置计数值为 3,表示主线程需要等待三个工作线程的完成。 - latch.await();
主线程调用await()
方法,将进入等待状态,直到计数值减为 0。 - latch.countDown();
每个工作线程在完成各自的任务后,调用countDown()
方法使计数值减一。当三个线程都调用完countDown()
后,计数值变为 0,主线程将被唤醒继续执行。 - 模拟任务执行:
线程使用Thread.sleep(2000)
模拟任务的执行过程,表示工作任务的耗时。
线程等待机制总结
- 阻塞等待:
Future.get()
方法会阻塞当前线程,直到另一个线程完成任务。这是通过线程间的同步原语(如wait()
和notify()
)实现的。 - 非阻塞回调:
CompletableFuture
提供了更灵活的方式来处理异步任务的结果,使用回调链来处理任务完成后的操作,而无需显式阻塞线程。它通过ForkJoinPool
或自定义的线程池执行任务,并在任务完成时自动触发回调。
我们在协程里边如何实现一个协程等待多个协程的结果呢?
同样的,为了实现CountDownLatch,我们可以使用Job的join函数来做,例如我们需要实现一个协程等待另外两个协程,可以这么做:
js 代码解读复制代码val preJob1 = launch {
delay(1000)
}
val preJob2 = launch {
delay(2000)
}
launch {
preJob1.join()
preJob2.join()
println("Pre jobs are finished.")
}
实际上,线程里边也可以这么做,但是线程本身的结构化管理比较麻烦,所以在正式项目中很少这么写。而协程可以结构化的取消,所以它的join比线程的join更加实用。另外,用async和await也能实现:
js 代码解读复制代码import kotlinx.coroutines.*
fun main() = runBlocking {
// 启动三个并发任务
val deferred1 = async { task1() }
val deferred2 = async { task2() }
val deferred3 = async { task3() }
// 等待所有任务完成并获取结果
val result1 = deferred1.await()
val result2 = deferred2.await()
val result3 = deferred3.await()
// 打印结果
println("Result 1: $result1")
println("Result 2: $result2")
println("Result 3: $result3")
}
suspend fun task1(): Int {
delay(1000) // 模拟耗时任务
return 10
}
suspend fun task2(): Int {
delay(2000) // 模拟耗时任务
return 20
}
suspend fun task3(): Int {
delay(1500) // 模拟耗时任务
return 30
}
// 输出:程序会等待所有任务完成后,依次打印结果。由于 `task2` 最耗时,因此它会最后完成。
Result 1: 10
Result 2: 20
Result 3: 30
解释:
runBlocking
:这是一个阻塞协程构建器,用于启动一个新的协程并阻塞当前线程,直到协程执行完毕。在实际应用中,你可能会在某个协程上下文中使用,而不是直接在main
函数中使用。async { ... }
:这是一个协程构建器,它会启动一个新的协程并返回一个Deferred
对象,这个对象类似于Future
,它表示一个将来会提供结果的值。await()
:Deferred
对象的一个方法,用于挂起当前协程,直到Deferred
对象的任务完成并返回结果。await()
是非阻塞的,如果任务已经完成,它会立即返回结果。task1
,task2
,task3
:这是三个模拟的耗时任务。它们使用delay()
函数来模拟耗时操作,并最终返回一个结果。
执行流程
- 三个任务 (
task1
,task2
,task3
) 会并发执行。 async
启动的协程立即返回Deferred
对象,而不等待任务完成。- 主协程调用
await()
挂起自己,直到对应的Deferred
完成并返回结果。 - 所有
await()
调用完成后,主协程继续执行并打印结果。
另外,用Channel也能实现类似的效果
如何用Channel来实现类似于CountDownLatch的效果呢?CountDownLatch并没有指定线程,而是等待固定次数到0之后开始。使用Channel实现是,需要将容量设置成你的计数的总数,设置计数的总数,是为了让计数的时候不要卡住协程,设置好之后,计数的方式就是往Channel里边发送数据,也就是调用它的send函数,而等待计数用读数据的方式就行,也就是调用recieve函数,不过需要手动循环一下,来确保等到足够的次数, 因为它本质上并不是CountDownLatch,而是用它可以实现CountDownLatch的功能,所以需要一些手动工作,需要手动设置一下等待的次数。用Channel来实现是在形式上和CountDownLatch最接近的等价物。
示例:
js 代码解读复制代码 // 容量设置成你的计数的总数,设置计数的总数,是为了让计数的时候不要卡住协程,
// 设置好之后,计数的方式就是往Channel里边发送数据,也就是调用它的send函数,
// 而等待计数用读数据的方式就行,也就是调用recieve函数,
// 不过需要手动循环一下,来确保等到足够的次数。
val channel = Channel<Unit>(2)
launch {
repeat(2) {
channel.receive()
}
}
launch {
delay(1000)
channel.send(Unit)
}
launch {
delay(2000)
channel.send(Unit)
}
到这里你可能发现了,传统线程中的协作与等待,到了协程里边变得异常容易。
select():先到先得
允许在多个挂起函数中选择第一个成功完成的操作,并返回结果。select
是非阻塞的,它会在多个异步操作中进行选择,当其中一个操作完成时,select
会立即返回该操作的结果,并取消其他操作。
select
的基本用法
示例1:
js 代码解读复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.select
fun main() = runBlocking {
val channel1 = Channel<Int>()
val channel2 = Channel<Int>()
launch {
delay(100) // 模拟一些工作
channel1.send(1) // 发送消息到 channel1
}
launch {
delay(200) // 模拟一些工作
channel2.send(2) // 发送消息到 channel2
}
val result = select<String> {
channel1.onReceive { value ->
"Received from channel1: $value"
}
channel2.onReceive { value ->
"Received from channel2: $value"
}
}
println(result)
}
// 由于 `channel1` 在 `channel2` 之前发送数据,因此输出将是:
Received from channel1: 1
代码解释:
-
select
的使用:select
块允许你在多个异步操作中选择第一个完成的操作。- 每个选择项是通过调用诸如
onReceive
、onSend
等方法来定义的,这些方法表示你感兴趣的挂起操作。
-
channel1.onReceive
和channel2.onReceive
:- 在
select
块中,你定义了两个onReceive
操作,分别从channel1
和channel2
中接收数据。 select
将监听这两个操作,当其中一个操作完成时,它将返回结果,并且取消其他挂起的操作。
- 在
-
launch
和delay
:- 两个协程分别发送数据到
channel1
和channel2
,但发送的时间不同(一个在 100 毫秒后,另一个在 200 毫秒后)。 - 因为
select
会选择第一个完成的操作,所以最终select
会选择从channel1
收到的数据。
- 两个协程分别发送数据到
-
取消未完成的操作:
select
块执行完后,所有未完成的挂起操作将被自动取消,这确保了系统资源的高效利用。
onJoin是仅限于在select代码块调用的函数,会监听job的结束。无论job以任何方式结束(正常结束或者被取消),onJoin的大括号就会被执行,做的工作也是类似于调用一下join函数,然后监听这个join()执行完毕的工作。但实际上, 它并没有调用join(), 它是直接监听的job的结束,并且大括号的返回值回作为select的返回值来返回。实际上整个select做的事情就是监听job1的结束以及设置一个回调,在job1执行结束之后触发这个回调。然后把回调的返回值作为自己的返回值来返回,这么写的作用是什么呢?作用就在于当你调用两个onJoin的时候,它只会执行最先结束的那个jon的监听回调。
示例2:
js 代码解读复制代码scope.launch {
val result = select {
// 仅限于在select代码块调用的函数,会监听job的结束。无论job以任何方式结束(正常结束或者被取消),
// onJoin的大括号就会被执行,做的工作也是类似于调用一下join函数,然后监听这个join()执行完毕的工作。
// 但实际上, 它并没有调用join(), 它是直接监听的job的结束,并且大括号的返回值回作为select的返回值来返回。
// 实际上整个select做的事情就是监听job1的结束以及设置一个回调,
// 在job1执行结束之后触发这个回调。然后把回调的返回值作为自己的返回值来返回,
// 这么写的作用是什么呢?作用就在于当你调用两个onJoin的时候,它只会执行最先结束的那个jon的监听回调。
job1.onJoin { // 这里也是挂起函数环境。可以直接在里边调用挂起函数。
delay(2000)
1
}
job2.onJoin {
2
}
println("Result: $result")
// 输出:
job1 done
job2 done
Result: 1
我们前边说欧join和await,他们是协程之间互相等待,但他们是等待到每一个协程都返回,而select则是选择最快的那个返回。另外,不只是join()有个对应的onJoin,deferred的await有个对应的onAwait, 它的回调有一个参数,这个参数就是await的返回值:
js 代码解读复制代码deferred.await()
deferred.onAwait {
it // 就是deferred.await()的返回值。
}
类似的操作对Channel也可以,Channel里边,在select中使用的还有一个onSend, 效果是发送一条数据给channel,并且,如果他是几个被监听的工作中最先完成的,它的回调就会被执行。发送时需要填一个参数,这个参数就是你要发送的数据。
js 代码解读复制代码channel.onSend("haha") {
}
同样的,recieve和recieveCatching也有对应的onRecieve和onReceiveCatching,功能都是类似的,在做这件事的同时,监听它的完成情况,并设置一个回调。
js 代码解读复制代码channel.onReceive {
}
channel.onReceiveCatching {
}
需要注意的是:channel 的 onSend()、onReceive()、onReceiveCatching() 只能选其一,不能同时应用
还有个特殊的onTimeOut, 不需要前缀,可以直接调用,作为select整体函数的超时回调。他是让我们设置一个超时时间,如果在这个超时时间内其他监听工作都没有结束,onJoin监听的工作没有结束,onSend监听的数据没有发送完成等等。那么onTimeOut的回调就会被触发,并且负责提供select的返回值。
js 代码解读复制代码onTimeout(1.seconds) {
}
select
的应用场景
- 竞争请求: 使用
select
在多个异步请求中选择第一个响应的请求。 - 超时处理: 与
withTimeoutOrNull
结合使用,select
可以用于处理超时逻辑。 - 多路复用: 通过
select
,可以在多个通道之间进行多路复用,处理第一个收到的数据。
select
的限制
select
是非阻塞的,尽管如此,需要注意设计避免出现死锁或资源浪费的情况,尤其是在多个并发任务之间使用select
时。- 由于
select
会自动取消其他未完成的操作,需要确保在需要的情况下显式处理取消的操作。
完整的示例代码:
js 代码解读复制代码package com.rengwuxian.coursecoroutines._5_collaboration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
val job1 = launch {
delay(1000)
println("job1 done")
}
val job2 = launch {
delay(2000)
println("job2 done")
}
val deferred = async {
delay(500)
"rengwuxian"
}
val channel = Channel<String>()
scope.launch {
val result = select {
// 仅限于在select代码块调用的函数,会监听job的结束。无论job以任何方式结束(正常结束或者被取消),
// onJoin的大括号就会被执行,做的工作也是类似于调用一下join函数,然后监听这个join()执行完毕的工作。
// 但实际上, 它并没有调用join(), 它是直接监听的job的结束,并且大括号的返回值回作为select的返回值来返回。
// 实际上整个select做的事情就是监听job1的结束以及设置一个回调,
// 在job1执行结束之后触发这个回调。然后把回调的返回值作为自己的返回值来返回,
// 这么写的作用是什么呢?作用就在于当你调用两个onJoin的时候,它只会执行最先结束的那个jon的监听回调。
job1.onJoin { // 这里也是挂起函数环境。可以直接在里边调用挂起函数。
delay(2000)
1
}
job2.onJoin {
2
}
deferred.await()
deferred.onAwait {
it
}
channel.onSend("haha") {
}
/* channel 的 onSend()、onReceive()、onReceiveCatching() 只能选其一,不能同时应用
channel.onReceive {
}
channel.onReceiveCatching {
}
*/
onTimeout(1.seconds) {
}
}
println("Result: $result")
}
delay(10000)
}
Java线程世界中没有和select完全对等的api,这是协程单独加的。用来方便做协程之间的协作与等待,同时等待多个,只取其中最快的那个。
评论记录:
回复评论: