Kotlin集合的并行操作
Kotlin集合的并行操作
使用Kotlin集合的并行操作允许我们同时处理集合中的元素,利用多核处理器来提高性能。这对于计算密集型任务,如过滤、映射和数据缩减,非常有用。
在本文中,我们将讨论一些在Kotlin集合上执行并行操作的方法。
为了解释并行操作的工作原理,我们将使用以下集合:
data class Person(val name: String, val age: Int, var isAdult: Boolean? = null)
private val people = listOf(
Person("Martin", 12),
Person("Ahmad", 42),
Person("Alina", 13),
Person("Alice", 30),
Person("Bob", 16),
Person("Charlie", 40)
)
在我们的示例中,我们将为18岁及以上的人分配成年状态(isAdult = true),对于18岁以下的人分配isAdult = false。
为了使并行操作更清晰,我们还将显示系统时间和线程名称(默认使用SLF4J记录器):
private fun Person.setAdult(){
this.isAdult = this.age >= 18
logger.info(this.toString())
}
我们期望的输出是一个年龄超过15岁的人集合,并按年龄排序:
private fun List``<Person>``.assertOver15AndSortedByAge() {
assertThat(this).containsExactly(
Person("Bob", 16, false),
Person("Alice", 30, true),
Person("Charlie", 40, true),
Person("Ahmad", 42, true)
)
}
我们将使用这个扩展函数List``<Person>``.assertOver15AndSortedByAge()来确保我们的每个解决方案都按预期工作。
2.1. 使用协程
协程可以依赖于并行操作,因为它们是非阻塞的、轻量级的、灵活的,并允许我们同时运行多个任务:
val filteredPeople = people
.map { person ->
async {
person.setAdult()
person
}
}
.awaitAll()
.filter { it.age > 15 }
.sortedBy { it.age }
filteredPeople.assertOver15AndSortedByAge()
在people.map { person -> … }中,我们使用async { … }为每个person对象创建了一个新的协程。
这允许协程与其他协程和主线程并发执行。
我们可以通过查看日志输出来看到每个操作都在不同的协程线程上运行:
13:03:44.484 [main @coroutine#1] INFO - 使用协程
13:03:44.522 [main @coroutine#2] INFO - Person(name=Martin, age=12, isAdult=false)
...
awaitAll()确保在map步骤中创建的所有异步协程都已完成。这确保了filteredPeople列表包含了所有并行处理的结果。
2.2. 使用Kotlin Flow
协程流—通常称为Kotlin Flow或简称Flow—是建立在协程之上的额外库,用于异步处理流数据。
我们可以使用flatMapMerge()来并行处理Flow中的元素:
val filteredPeople = people.asFlow()
.flatMapMerge { person ->
flow {
emit(
async {
person.setAdult()
person
}.await()
)
}
}
.filter { it.age > 15 }
.toList()
.sortedBy { it.age }
filteredPeople.assertOver15AndSortedByAge()
代码使用Flow并发处理people数组中的每个person对象:
13:03:44.706 [main @coroutine#8] INFO - 使用Kotlin Flow
...
但我们必须注意flatMapMerge()是Kotlin协程中的一个实验性特性,尚未稳定或可能在未来版本中更改。因此,为了使用它,我们必须添加一个注释:
@OptIn(ExperimentalCoroutinesApi::class)
像往常一样,我们可以将注释添加到类或函数中。
2.3. 使用RxJava或RxKotlin
RxJava是一个基于Java的反应式编程库,是反应式扩展的实现。同时,RxKotlin是RxJava的Kotlin扩展:
val observable = Observable.fromIterable(people)
.flatMap(
{
Observable.just(it)
.subscribeOn(Schedulers.computation())
.doOnNext { person -> person.setAdult()}
},
people.size // 使用maxConcurrency定义元素的数量
)
.filter { it.age > 15 }
.toList()
.map { it.sortedBy { person -> person.age } }
.blockingGet()
observable.assertOver15AndSortedByAge()
首先,我们将原始的people数组转换为Observable对象:
Observable.fromIterable(people)
然而,RxKotlin提供了一个更简洁的扩展函数作为替代:
people.toObservable()
flatMap()对由Observable发出的每个person应用转换。在这种情况下,它创建了一个新的Observable,发出相同的person对象。
然后,为了控制并行操作,强烈建议在flatMap()中显式设置maxConcurrency参数。这允许我们定义最大并发内Observable的数量,确保可预测的资源利用。
让我们在日志输出中看到每个操作在不同的线程中运行:
13:03:44.691 [main] INFO - 使用RxKotlin
...
我们可以看到每个操作的线程名称都不同。这表明操作正在并行运行。
2.4. 使用Java Stream API
在Java 8中,Stream API引入了一种强大的机制,以声明性和函数式的方式处理数据集合。
我们可以使用parallelStream(),这是Collection类型(如List、Set等)的可用方法,它从Collection的元素创建一个并行Stream:
val filteredPeople = people.parallelStream()
.map { person ->
person.setAdult()
person
}.filter { it.age > 15 }
.sorted { p1, p2 -> p1.age.compareTo(p2.age) }
.collect(Collectors.toList())
filteredPeople.assertOver15AndSortedByAge()
当我们调用parallelStream()时,Collection的元素被划分为几个子Stream实例。
每个子Stream然后在单独的线程上并发处理:
13:03:44.683 [main] INFO - 使用Stream API
...
最后,每个子Stream的结果被组合以产生Stream操作的最终结果。
2.5. 使用_ExecutorService_
现在我们将使用ExecutorService,这是Java中的一个接口,提供了一种异步执行任务(Runnable或Callable)的方式。
首先,我们必须创建一个线程池,其大小等于people元素的数量:
val executor = Executors.newFixedThreadPool(people.size)
然后我们调用map{}对people中的每个元素(person)应用一个lambda表达式。我们使用lambda表达式创建一个新的Callable对象,并将其submit()到executor:
val futures = people
.map { person ->
executor.submit(Callable {
person.setAdult()
person
}).get()
}
.filter { it.age > 15 }
.sortedBy { it.age }
futures.assertOver15AndSortedByAge()
我们可以再次检查日志,以看到使用了多个并发线程:
13:03:44.700 [main] INFO - 使用ExecutorService
...
最后,我们将通过调用shutdown()来停止线程池:
executor.shutdown()
这确保了executor释放了它持有的资源。
3. 结论
在本教程中,我们讨论了在Kotlin集合上执行并行操作的各种方法。
协程和Kotlin Flow以其富有表现力的Kotlin风格可以很好地完成这项工作。如果我们想使用第三方库,RxJava或RxKotlin也是成熟可靠的选择。另外,Java也有处理此问题的相关API,如Stream和ExecutorService。
如常,示例的源代码可在GitHub上获取。