Shaun's Space

Shaun's Space

马上订阅 Shaun's Space RSS 更新: https://cniter.github.io/atom.xml

Scala 多线程编程小结

2021年10月10日 09:56

前言

  多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。

  Shaun 目前接触的 Scala 原生多线程编程语法就两个:Future 和 Parallel Collections。其中 Future 用的的最多,并且 Parallel Collections 语法非常简单,所以主要介绍 Future,附带提一下 Parallel Collections。

ExecutionContext 篇

  ExecutionContext 是 Future 的执行上下文,相当于是 Java 的线程池,Java 的线程池主要有以下两类:

  • ThreadPool:所有线程共用一个任务队列,当线程空闲时,从队列中取一个任务执行。
  • ForkJoinPool:每个线程各有一个任务队列,当线程空闲时,从其他线程的任务队列中取一批任务放进自己的队列中执行。

  对于少量任务,这两个池子没啥区别,只是 ThreadPool 在某些情况下会死锁,比如在一个并行度为 2 (最多两个线程)的 ThreadPool 中执行两个线程,两个线程又分别提交一个子任务,并等到子任务执行完才退出,这时会触发相互等待的死锁条件,因为没有多余的空闲线程来执行子任务,而 ForkJoinPool 中每个线程产生的子任务会放在自己的任务队列中,ForkJoinPool 可以在线程耗尽时额外创建线程,也可以挂起当前任务,执行子任务,从而防止死锁。对于大量任务,ForkJoinPool 中的空闲线程会从其他线程的任务队列中一批一批的取任务执行,所以一般会更快,当然若各个任务执行时间比较均衡,则 ThreadPool 会更快。

  根据线程池创建的参数不同,Executors 中提供了 5 种线程池:newSingleThreadExecutor(单线程线程池,可保证任务执行顺序),newFixedThreadPool(固定大小线程池,限制并行度),newCachedThreadPool(无限大小线程池,任务执行时间小采用),newScheduledThreadPool(同样无限大小,用来处理延时或定时任务),newWorkStealingPool(ForkJoinPool 线程池)。前四种都属于 ThreadPool,根据阿里的 Java 的编程规范,不推荐直接使用 Executors 创建线程池,不过对于计算密集型任务,一般使用 newFixedThreadPool 或 newWorkStealingPool 即可,线程数设置当前 CPU 数即可(Runtime.getRuntime.availableProcessors()),多了反而增加线程上下文切换次数,对CPU 的利用率不增反减。

  Scala 提供了一个默认的 ExecutionContext:scala.concurrent.ExecutionContext.Implicits.global,其本质也是一个 ForkJoinPool,并行度默认设置为当前可用 CPU 数,当然也会根据需要(比如当前全部线程被阻塞)额外创建更多线程。一般做计算密集型任务就用默认线程池即可,特殊情况也可以自己创建 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)),下面的代码就可以创建一个同步阻塞的 ExecutionContext:

1
2
3
4
5
val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) { runnable.run() }
})

原因是 runnable.run() 并不会新开一个线程,而是直接在主线程上执行,和调用普通函数一样。

Future 篇

  先上一个简单的 Future 并发编程 Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
////import scala.concurrent.ExecutionContext.Implicits.global
//val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
val pool = Executors.newWorkStealingPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)

val futures = Array.range(0, 10000).map(i => Future {
println(i)
Thread.sleep(100)
i
})

val futureSequence = Future.sequence(futures)
futureSequence.onComplete({
case Success(results) => {
println(results.mkString("Array(", ", ", ")"))
println(s"Success")

ec.shutdown()
pool.shutdownNow()
}
case Failure(e) => println(s"Error processing...

剩余内容已隐藏

查看完整文章以阅读更多