Scala 多线程编程小结
前言
多线程的执行方式有两种:并发(Concurrent)和并行(Parallel),简单来说,并发就是两个线程轮流在一个 CPU 核上执行,而并行则是两个线程分别在两个 CPU 核上运行。一般而言,程序员无法直接控制线程是并发执行还是并行执行,线程的执行一般由操作系统直接控制,当然程序运行时也可以做简单调度。所以对于一般程序员来说,只需要熟练使用相关语言的多线程编程库即可,至于是并发执行还是并行执行,可能并不是那么重要,只要能达到预期效果就行。
Shaun 目前接触的 Scala 原生多线程编程语法就两个:Future 和 Parallel Collections。其中 Future 用的的最多,并且 Parallel Collections 语法非常简单,所以主要介绍 Future,附带提一下 Parallel Collections。
ExecutionContext 篇
ExecutionContext 是 Future 的执行上下文,相当于是 Java 的线程池,Java 的线程池主要有以下两类:
- ThreadPool:所有线程共用一个任务队列,当线程空闲时,从队列中取一个任务执行。
- ForkJoinPool:每个线程各有一个任务队列,当线程空闲时,从其他线程的任务队列中取一批任务放进自己的队列中执行。
对于少量任务,这两个池子没啥区别,只是 ThreadPool 在某些情况下会死锁,比如在一个并行度为 2 (最多两个线程)的 ThreadPool 中执行两个线程,两个线程又分别提交一个子任务,并等到子任务执行完才退出,这时会触发相互等待的死锁条件,因为没有多余的空闲线程来执行子任务,而 ForkJoinPool 中每个线程产生的子任务会放在自己的任务队列中,ForkJoinPool 可以在线程耗尽时额外创建线程,也可以挂起当前任务,执行子任务,从而防止死锁。对于大量任务,ForkJoinPool 中的空闲线程会从其他线程的任务队列中一批一批的取任务执行,所以一般会更快,当然若各个任务执行时间比较均衡,则 ThreadPool 会更快。
根据线程池创建的参数不同,Executors 中提供了 5 种线程池:newSingleThreadExecutor(单线程线程池,可保证任务执行顺序),newFixedThreadPool(固定大小线程池,限制并行度),newCachedThreadPool(无限大小线程池,任务执行时间小采用),newScheduledThreadPool(同样无限大小,用来处理延时或定时任务),newWorkStealingPool(ForkJoinPool 线程池)。前四种都属于 ThreadPool,根据阿里的 Java 的编程规范,不推荐直接使用 Executors 创建线程池,不过对于计算密集型任务,一般使用 newFixedThreadPool 或 newWorkStealingPool 即可,线程数设置当前 CPU 数即可(Runtime.getRuntime.availableProcessors()),多了反而增加线程上下文切换次数,对CPU 的利用率不增反减。
Scala 提供了一个默认的 ExecutionContext:scala.concurrent.ExecutionContext.Implicits.global,其本质也是一个 ForkJoinPool,并行度默认设置为当前可用 CPU 数,当然也会根据需要(比如当前全部线程被阻塞)额外创建更多线程。一般做计算密集型任务就用默认线程池即可,特殊情况也可以自己创建 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)),下面的代码就可以创建一个同步阻塞的 ExecutionContext:
1 | val currentThreadExecutionContext = ExecutionContext.fromExecutor( |
原因是 runnable.run() 并不会新开一个线程,而是直接在主线程上执行,和调用普通函数一样。
Future 篇
先上一个简单的 Future 并发编程 Demo:
1 | ////import scala.concurrent.ExecutionContext.Implicits.global |