共计 1862 个字符,预计需要花费 5 分钟才能阅读完成。
Future
-
scala.concurrent.Future
异步执行代码块import java.time._ import scala.concurrent._ import ExecutionContext.Implicits.global // 全局线程池 Future {Thread.sleep(10000) println(s"This is the future at ${LocalTime.now}") } println(s"This is the present at ${LocalTime.now}")
-
监听结果(阻塞)
import scala.concurrent.duration._ val f = Future {Thread.sleep(10000); 42 } val result = Await.result(f, 10.seconds) // 阻塞 10s val f = Future {...} Await.ready(f, 10.seconds) val Some(t): Option[Try[T]] = f.value t match {case Success(v) => println(s"The answer is $v") case Failure(ex) => println(ex.getMessage) }
ready()
- 到达等待时间无结果时,会抛出异常
TimeoutException
- 任务抛出的异常时,result() 会再次抛出异常,ready() 可获取结果
-
回调
val f = Future {Thread.sleep(10000) if (random() < 0.5) throw new Exception 42 } f.onComplete {case Success(v) => println(s"The answer is $v") case Failure(ex) => println(ex.getMessage) }
-
问题:1. 回调地狱;2. 执行顺序无法预知
val future1 = Future {getData1() } val future2 = Future {getData2() } future1 onComplete {case Success(n1) => future2 onComplete {case Success(n2) => { val n = n1 + n2 println(s"Result: $n") } case Failure(ex) => ... } case Failure(ex) => ... }
将 Future 看作集合
// val 会立即执行,def 调用时执行 val future1 = Future {getData1() } val future2 = Future {getData2() } // 都获取到结果时,才会进行计算 val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2
-
Promise
- 与 Java 8 中的
CompletableFuture
类似 -
Future 只读,在任务完成时隐式设置结果值;Promise 类似,但结果值可显式设置
// Future def computeAnswer(arg: String) = Future {val n = workHard(arg) n } // Promise def computeAnswer(arg: String) = {val p = Promise[Int]() Future {val n = workHard(arg) // 显式设置结果 p.success(n) workOnSomethingElse()} // 立即返回该 Promise 对应的 Future p.future } // 多个任务对应一个 Promise val p = Promise[Int]() Future {var n = workHard(arg) // 若 Promise 未完成则接受结果并返回 true;否则忽略结果并返回 false p.trySuccess(n) } Future {var n = workSmart(arg) p.trySuccess(n) }
- 与 Java 8 中的
-
执行上下文
- 默认执行在全局的
fork-join
线程池(默认大小为核数),适用于计算密集型任务 -
对于阻塞型 /IO 密集型的任务,可使用 Java 的
Executors
// 隐式声明,或者使用 Future.apply 显式声明 val pool = Executors.newCachedThreadPool() implicit val ec = ExecutionContext.fromExecutor(pool) val f = Future { val url = ... blocking {val contents = Source.fromURL(url).mkString ... } }
- 默认执行在全局的
正文完