译文:Actor的并发策略

原文链接 The Neophyte’s Guide to Scala Part 14: The Actor Approach to Concurrency

经过前面几篇文章的介绍,你学习到了Scala的类型系统(type system)的使用时的灵活性与编译时的安全性, 我们接下来转移到在系列中已经提到过的一个话题:Scala的并发策略。

在更早的几篇文章中,你学习过通过 Future 类型的组合做到非同步并发的方法。

这种方法已经可以非常适合地解决不同的问题。但Scala还提供了另一种方式,也就是Scala并发策略的第二个基石:行动者模式 (Actor Model)。 这是一种完全基于进程间消息传递机制的并发策略。

行动者(以下有时候称为Actor)模式并不是Scala所提出的一种新概念 - 你还可以从Erlang语言中找到Actor并发模型最好实现之一。 Scala 核心库曾经在很长一段时间内保有自己的Actor实现,但 Akka 工具箱包含了一个更好的行动者模式的实现, 并在很长一段时间被社区看做Scala的Actor模式实质的标准。 随后在 Scala 的 2.11 版本中Actor正式被移除出 Scala 核心库,Scala Actor也就彻底的被Akka工具包的Actor实现替代掉了。

此篇文章会向你介绍Akka的Actor模式;你还会学到基本的Akka工具包编程的范例。 需要注意的是,这篇文章与此系列之前的文章不同,我并不会深入讨论你所需要了解的Akka Actor细节; 其目的是提供给你Akka解决问题的思路,并作为吸引你真正去使用它的契机。


共享可变状态(Shared Mutable State)所带来的问题

目前,实现并发机制的方案的主要思路是共享可变状态 - 一个应用靠大量的表示状态的对象和线程组成,每个表示对象会被应用中不同部分的不同线程修改它们的状态。 一般来说,为了保证应用的正确运行,某个状态不会被应用不同部分的线程以错误的方式同时修改,代码中会散布者着各式各样的读写锁。 在此同时,我们又得尽量保证不会在大段代码外设锁,因为这样会让程序的运行速度大幅度的降低。

实际上更常见的是,程序员经常会在最开始编写的时候完全没考虑过并发的问题 - 他们总是在多线程需求来临之时才把代码重写成多线程结构。 这样所导致的结果是,人们写的没有考虑过并发需求的代码会非常直白,但如果把这样的代码移植成并发化后,代码会变得极难读懂。

上面的问题是由于以底层同步锁和线程所构成的代码不容易描述引发的。这样会导致人们很难以优雅的方式把问题解决: 如果你不能清晰的解释清代码到底在做什么,你可以大胆猜测代码里已经充斥着各式各样龌龊的bug, 比如竞态条件(race condition),死锁(deadlock)、或是一些捉摸不透的行为 - 甚至有一些只有在你的代码部署到了生产环境后的几个月才能注意到。

另外,性能调优一个以底层并发控件搭建的代码工程,是一个十分具有挑战的工作。


行动者模式

行动者编程模式旨在在避免上述问题的同时,让你写出可推导的、高性能的并发代码。 与目前大规模使用的共享可变状态方法不同的是,行动者模式要求你在从开始编写代码的时候就在脑中考虑到程序设计中的并发问题 - 它并不允许你在之后再把并发支持加进来。

按照Actor的思路,你的应用应由许多个轻量的实体,也就是 Actor 构成。每一个actor都负责一个小任务,因此它可以很容易的被描述。 对于更复杂的业务逻辑来说,多个actor之间会产生交互,比如任务委派、或者将消息传递给其他的协作者。

行动者系统

行动者是一种可怜的生物:他们不能靠自己存活很久。更确切地说,Akka中的每一个actor都寄生于另一个actor之中,并且每一个都是由所谓的actor系统创造的。 ActorSystem 使你可以创造和搜寻actor的同时,还提供了一大堆我们目前并不需要了解的功能

为了允许下面代码的运行,首先把下面的库依赖以及依赖解析器添加到你的基于SBT的Scala 2.10版本的项目中:

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.3"

然后,我们创建一个 ActorSystem 实例。我们需要它作为actor的运行环境:

import akka.actor.ActorSystem
object Barista extends App {
  val system = ActorSystem("Barista")
  system.shutdown()
}

我们在上面实例化了一个新的 ActorSystem,并给它起了个 “Barista”(意为”咖啡师”)的名字 - 如果你之前看过我们的那篇关于制作咖啡的文章, 那你应该熟悉了如何组合多个 Future 对象。

最后,作为优秀市民,我们最终把不需要使用的行动者系统关闭掉。

定义一个行动者

你的应用中有十几个还是几百万个actor,完全取决于你的使用案例,而且对于Akka来说,几百万个是可以做的到的。 也许你会以为我们在用大数字忽悠你。关于Akka很很重要一点,一个actor与一个线程之间并没有一一对应的关系 - 假设说如果有的话,我们会很快的消耗光内存。 更恰当地说,由于actor天生的非阻塞的特性,一个线程可以执行许多个actor,而线程到底需要切换到哪一个执行是由其中哪个有消息需要处理来决定的。

为了理解一个行动者到底会做什么,最好还是先创建第一个简单的actor。 在此,我们定义一个只会接受订单并打印消息到控制台之外,别的事都不会做的 Barista

sealed trait CoffeeRequest
case object CappuccinoRequest extends CoffeeRequest
case object EspressoRequest extends CoffeeRequest

import akka.actor.Actor
class Barista extends Actor {
  def receive = {
    case CappuccinoRequest => println("I have to prepare a cappuccino!")
    case EspressoRequest => println("Let's prepare an espresso.")
  }
}

首先,我们定义了几种我们的行动者可以理解的消息 - CappuccinoRequestEspressoRequest。 通常来讲,如需传递参数,case class 会被用作actor之间消息传递的类型。 如果你不需要传递有参数的消息的话,就可以跟我们现在所做的方式一样,以 case object 来代表消息。

在任何情况下,请保证你的消息是不可更改的。不然很可怕的事情会发生。不然很可怕的事情会发生。

接下来,我们看一下 Barista 类 - 一个继承自 Actor 特征 (trait)的行动者。 Actor 在实例化时,需要实现它一个返回值为 Receive、名为 receive 的方法 (也就是 def receive: Receive)。 ReceivePartialFunction[Any, Unit] 的一种类型别名。

消息处理

然而 receive 方法的意义是什么?它的返回类型 PartialFunction[Any, Unit] 又是什么?

简而言之,一个由 receive 方法返回的部分函数(partial function)会负责处理你的消息。 当你的软件的任何部分 - 不管是不是当前的行动者 - 给你的行动者发了一条消息,Akka 总会尝试让这个行动者处理那条消息: 调用这个行动者的 receive 方法,并将消息以参数的形式传入。

产生副作用

当处理一条消息时,一个行动者能做到你任何想做的事,除了让它返回一个值。

“你说啥!?”

根据 receive 返回值部分函数的 Unit 类型可以推断,你的部分函数是有副作用的。 也许这对你的世界观会产生动摇,因为我们一直在强调以纯函数范式编程。 但对并发运算来说,很多事情在有副作用的时候才说的通: 行动者们储存着你程序的状态,他们有一些被严格控制的副作用行为自然是可以的; 行动者们所收到的每条信息都是一条一条隔离处理的,因此你不必在他们中间引入同步或锁。

未定类型

这里使用的部分函数不仅有副作用,它的参数还是一个 Any 类型的未定类型的值。 为什么我们没有在此利用我们强大的类型系统呢?

这与Akka的几个重要的设计决定相关:Akka允许你把消息转发给其他的actor、实现负载平衡、或是在不知情的情况下将任务代理给其他的actor。

实际使用中,receive 的返回值为未定类型通常不会导致问题出现。你所需要做的就如同上面的例子一样:对消息本身进行强类型化,然后对所需的不同消息类型进行模式匹配。

有时候,弱类型的行动者是可能会导致编译器无法检查出的恶心bug。 如果你已经有了强类型强迫症,而且想在你程序的每个角落使用强类型, 那你可以去看看Akka新提供的Typed Channels特性。

非同步与非阻塞

我在前面写道,Akka中的actor总会处理你发给他的消息。这一条应牢记于心:发送消息和处理消息的过程是非同步并且非阻塞的。 消息的发送者不会在消息被处理完成之前被阻塞;相反,他们会在消息发送后立刻进行自己其他的工作。 至于接受者在稍后是否返回信息 - 消息的发送者也许期待、也许完全不关心。

当行动者从应用的某个部件收到消息时,消息首先会被置于行动者自身的邮箱(类似于一个队列)中。 把消息置入行动者的邮箱是一个非阻塞操作。也就是说,发送者不必确认并等待消息是否真的进入了接受者的邮箱队列里。

调度员(dispatcher)在新的消息进入行动者邮箱时,会以非同步的方式通知相应组件。 如果行动者没有处理过相同消息的话,这个行动者就会被分配到执行上下文中的某个可用线程中; 当行动者已经处理了某个消息,调度员就把邮箱中下一个所需要处理的消息委派给这个行动者。

行动者在消息处理的时候会阻塞分配给自己的线程。虽然这样做不会阻塞消息发送者,但这也意味着过长的操作也会降低整体性能。 这是因为其他的行动者分配线程进入消息处理阶段时,被阻塞的线程是不可用的。

因此,设计 Receive 部分函数的核心原则是尽量减少每个消息的处理时长。 最重要的,尽量不要在消息处理的代码里面调用任何阻塞代码

当然,严格避免阻塞代码,可能会导致你有些行为无法实现 - 比如说目前大部分的数据库驱动仍然是会阻塞的,但你想在你基于行动者模型开发的应用中访问或者保存数据。 对于此类难题已经有相关解决方案,但我们并不会在这篇介绍性文章涉及到。

创建行动者

定义一个actor的任务已经在上面顺利完成,但如何在我们的应用里实际使用咖啡师 Barista 行动者呢? 为了做到这点,我们必须先实例化一个新的 Barista 行动者。 你也许会像往常一样,像下面一样调用它的构造函数:

val barista = new Barista // 会抛出异常

这样做会导致运行失败!Akka 发给你了一张写着 ActorInitializationException 的谢谢卡。 为了整个行动者模式运转正常,actor们必须由 ActorSystem 和它的组件来进行管理。 因此,你必须请求行动者系统来初始化一个新的actor,而不是直接调用它的构造函数:

import akka.actor.{ActorRef, Props}
val barista: ActorRef = system.actorOf(Props[Barista], "Barista")

定义在 ActorSystemactorOf 方法需要一个 Props 实例,而它能提供配置新创建行动者的方法。视需要,你还可以给你实例化的那个actor起个名字。

请注意,actorOf 返回的对象类型不是 Barista,而是 ActorRef。 行动者们从不会直接的访问其他的行动者,因此我们不必访问actor实例。 然而,行动者们或是其他组件在发送消息给其他行动者时,会取得他们的引用对象,而不是他们本身。

所以,ActorRef 就像是actor的一种代理人(proxy)。 这样会给我们带来一些方便,比如一个 ActorRef 可以被序列化,然后将它作为一个非本机的远程行动者的代理。 对获取到 ActorRef 的组件来说,actor的物理位置 - 到底是存在于同一个JVM还是远程电脑上 - 是透明的。 我们将其称之为位置透明性

请注意,ActorRef 没有类型化参数。一个 ActorRef 可以被替换成任意一个其他的 ActorRef,这就允许我们把消息发送给任意的 ActorRef 引用对象。 就像上面所提到的,这是Akka的特别设计 - 允许了你在改变行动者系统拓扑结构的同时不必对发送者进行任何修改。

发送消息

现在我们已经实例化了一个 Barista actor和引用到它的 ActorRef,然后我们就可以发消息了。 调用 ActorRef! 方法:

barista ! CappuccinoRequest
barista ! EspressoRequest
println("I ordered a cappuccino and an espresso")

调用 ! 是一个放射后不管(fire-and-forget)的操作:你告诉 Barista 你要点一杯卡布奇诺,然而并不等待咖啡师的回应, 这就是Akka中actor之间交互的最常见模式。调用此方法实际上的行为是,你让Akka把你的消息放置于接受者的邮箱队列里。 像我们所说的,消息发送不是阻塞行为,消息的接受者最终会在将来的某时刻处理你发送的消息。

由于消息机制的不同步的性质,上面代码的结果是非决定性的。 比如有可能是这样:

I have to prepare a cappuccino!
I ordered a cappuccino and an espresso
Let's prepare an espresso.

尽管我们最初发送两条消息给 Barista 的邮箱,在上面的示例中,我们自己的 println 输出插在了处理两条消息之间。

答复消息

只是把消息发送给别人是不够的。你有时候会想要答复消息的发送者,当然,仍然按非同步的方式。

为了直接让你知道如何答复发送者,我们略过一些内容直接告诉你,actor有一个能返回最后一条(也就是当前正在处理的)消息的发送者的方法:sender

但为什么actor能知道是谁发送的消息呢?答案就在 ! 方法的第二个参数,一个隐式参数列表类型:

def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

ActorRef! 方法在一个行动者内被调用时,行动者会把自己的 ActorRef 隐式的传入此方法。

我们把 Barista 的代码稍作更改,在打印到控制台之前立刻回复一个 Bill 消息给 CoffeeRequest 消息的发送者:

case class Bill(cents: Int)
case object ClosingTime
class Barista extends Actor {
  def receive = {
    case CappuccinoRequest =>
      sender ! Bill(250)
      println("I have to prepare a cappuccino!")
    case EspressoRequest =>
      sender ! Bill(200)
      println("Let's prepare an espresso.")
    case ClosingTime => context.system.shutdown()
  }
}

我们在上一段代码中加入了一条新的消息 ClosingTime。这个消息会使得 Barista 通过访问 ActorContext 来 关闭整个行动者系统。

现在,我们介绍第二个行动者,其代表了一个客户 customer

case object CaffeineWithdrawalWarning
class Customer(caffeineSource: ActorRef) extends Actor {
  def receive = {
    case CaffeineWithdrawalWarning => caffeineSource ! EspressoRequest
    case Bill(cents) => println(s"I have to pay $cents cents, or else!")
  }
}

这个行动者是一个咖啡成瘾者,他唯一能做的就是点咖啡。 我们传递一个 ActorRef 到他的构造函数中, 对这个顾客来说,他不知道这个 ActorRef 是指向了一个 Barista 还是什么,只知道这个行动者引用是他的咖啡因饮料的来源。 他只关心是否能发送 CoffeeRequest 给这个引用。

最后,为了让所有东西运转起来,我们需要创建两个行动者,并将一个 CaffeineWithdrawalWarning 消息发送给我们的顾客:

val barista = system.actorOf(Props[Barista], "Barista")
val customer = system.actorOf(Props(classOf[Customer], barista), "Customer")
customer ! CaffeineWithdrawalWarning
barista ! ClosingTime

对于 Customer,我们使用一种不同的创建 Prop 的工厂方法: 需要实例化的行动者的类型和实例化它所需要的参数一起传入到工厂方法里。 这样我们的咖啡师的 ActorRef 就可以传入到顾客的构造函数里了。

发送一条 CaffeineWithdrawalWarning 消息给顾客,会使得它发送一个 EspressoRequest 消息给咖啡师; 咖啡师在接收后,再反过来返回给顾客一个 Bill(账单) 消息。 输出会像是下面这样:

Let's prepare an espresso.
I have to pay 200 cents, or else!

首先,当咖啡师处理 EspressoRequest 消息时,它会给顾客发送一条新消息; 它在发送新消息给客户时,并不会阻塞 EspressoRequest 消息的处理(也就是往控制台打印一段字符串)。 稍后,顾客开始处理 Bill 账单信息,并把它打印到控制台。

问问题

有时候,仅仅发送消息给行动者并期待将来某个时间的回复是不够的。 最常见的情况是,我们需要在不同的组件中与行动者互动,而不是仅仅在行动者之间互动。 在行动者的世界外,其他组件是无法接收消息的。

为了对付这种情况,Akka 加入了对于 ask(询问)的支持,它提供了一个基于actor和基于future的并行实现之间进行交互的一架桥梁。

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(2.second)
implicit val ec = system.dispatcher
val f: Future[Any] = barista2 ? CappuccinoRequest
f.onSuccess {
  case Bill(cents) => println(s"Will pay $cents cents for a cappuccino")
}

首先,你需要导入一些包以获得ask 语法支持,并隐性的为 ? 方法返回的 Future 对象添加一个超时规则。 并且,你需要一个 ExecutionContext。这里,我们简单地使用了 ActorSystem 的默认调度器 - 它同时还是一个方便获取的一个 ExecutionContext

就像你看到的,返回的Future 对象的内含类型是 Any。这应该不会让你感到惊讶,毕竟它就是一个行动者发送过来的任意一条消息而已。

对于被询问的行动者来说,ask 行为上跟返回给一个消息发送者一条消息是一回事。 这就是为什么我们不必更改任何代码,就可以询问一个 Barista

被询问的行动者返回消息给询问者时,Promise 对象所属的返回的 Future 就完成了。

一般来讲,在可以使用 告知 的情况下就不要使用 询问,因为后者会消耗更多资源。 Akka 不是跟懂礼貌的人用的! 但是,总有情况是你必须使用询问的,在这种情况下请自由使用。

有状态的行动者

一个行动者也许会有自己的内部状态。 有时,应用的一大半状态是由行动者之间传递的不可变消息组成的。

一个行动者在同一时刻只会处理一条信息。由于做到了这一点,行动者理论上是可以维持并修改内部状态的。 这意味着行动者内部可能会有可变状态,但由于每条消息是在隔离开的情况下处理,同一个行动者的内部状态并不会因为并行问题而搞砸。

为了演示,我们把没有状态的 Barista 改造成携带状态的行动者。简单的让它记录订单数量:

class Barista extends Actor {
  var cappuccinoCount = 0
  var espressoCount = 0
  def receive = {
    case CappuccinoRequest =>
      sender ! Bill(250)
      cappuccinoCount += 1
      println(s"I have to prepare cappuccino #$cappuccinoCount")
    case EspressoRequest =>
      sender ! Bill(200)
      espressoCount += 1
      println(s"Let's prepare espresso #$espressoCount.")
    case ClosingTime => context.system.shutdown()
  }
}

我们引入了两个变量,cappuccinoCountespressoCount,分别记录每种咖啡的订单数。 事实上这是我们在整个系列教程里第一次使用变量 var。 尽管我们在函数式编程中尽量避免使用变量,但这是唯一一种允许行动者携带状态的方式。 因为每条消息是在被隔离开的情况下执行,上面的代码执行起来就像是在非行动者环境下使用 AtomicInteger 值。

总结

到此为止就是我们关于行动者编程模型的介绍,还有如何在Akka中使用它。 虽然我们只是粗略的体验了Akka一些表面的内容,也略过了不少重要的概念, 但我仍希望你已经有了足够多关于使用行动者模型的并行策略的领悟,并使你继续学习更多的内容。

在接下来的文章中,我会丰富我们的例子,给它加一些有意义的行为,并向你讲解Akka更多的理念,还有向你介绍在行动者系统是如何处理错误的。

Posted by Daniel Westheide Feb 27th, 2013

译文:欢迎来到未来

译者前言

xp最近迷上了Scala这门第一眼看起来像是 Java with Haskell 的语言。刚学习完 Martin Odersky 在Coursera上的关于Scala函数式编程语言的课程后,惊喜的发现作者还有一门进阶的 Principles of Reactive Programming 课程。

这一篇博客翻译自来自 Daniel Westheide 的 Scala 课程系列的第八篇 Welcome to the Future,其“生动形象”的介绍了 Scala 的 Future 的使用。所有权利由原作者 Daniel Westeide 保留。

正文译文

作为一个有远见有热情的Scala开发者,你也许已经听说过Scala的并发策略了,或者这正是Scala吸引到你的首要原因。基于Scala的语言特性支持,描述一个并发问题变得很简单,并且编写一个规范的并发程序要远比其他使用底层并发API接口的语言要容易许多。

Scala实现并发机制的基石叫做Future,另一个是Actor。这篇文章的主题是Future,我会向你以函数编程的方式介绍Future的强大之处。

为了让本篇文章的例子运行,你请确保你正在使用Scala 2.9.3版本或更高版本。本篇文章中所讨论的Future版本是在2.10.0中被正式加入Scala,并在之后向后移植到了2.93版本中。最开始的时候,Future是作为Akka并发工具包早前版本的一部分。需要注意的是,Akka中的Future API与Scala的目前的版本稍有区别。


顺序执行代码的缺点

假设你要准备一杯卡普奇诺咖啡。你可以简单的一步步执行以下步骤:

  1. 把备好的咖啡豆磨成粉
  2. 烧水
  3. 用刚才磨好的咖啡粉和热水调制一杯浓缩咖啡
  4. 打一些奶泡
  5. 将浓缩咖啡和奶泡混合,完成一杯卡普奇诺咖啡

以上步骤翻译成Scala:


import scala.util.Try

// Some type aliases, just for getting more meaningful method signatures:
type CoffeeBeans = String
type GroundCoffee = String

case class Water(temperature: Int)

type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String
// dummy implementations of the individual steps:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water = water.copy(temperature = 85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"

// some exceptions for things that might go wrong in the individual steps
// (we'll need some of them later, use the others when experimenting
// with the code):
case class GrindingException(msg: String) extends Exception(msg)

case class FrothingException(msg: String) extends Exception(msg)

case class WaterBoilingException(msg: String) extends Exception(msg)

case class BrewingException(msg: String) extends Exception(msg)

// going through these steps sequentially:
def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

这样的做法有几点优势:你会得到一系列具有很高可读性、循序渐进的指令。并且,由于规避了上下文切换,你几乎不大可能在调制卡普奇诺的过程中找不到北。

反过来看这样做的缺点:按顺序依次执行意味着在调制咖啡过程中的每个阶段,你的大脑和身体都处于阻塞状态,也就意味着在每个任务流程完成前,什么也做不了了。只有完成了前一个任务,你才可以开始比如烧热水以及剩下的步骤。

显然,宝贵资源在等待的过程中浪费掉了。为了解决这个问题,你也许会向同时启动多个步骤让它们并发执行。比方说,当你看到咖啡粉磨好而且水烧开后,你才开始冲泡浓缩咖啡,在此同时你也启动了牛奶打泡机。

软件开发与准备一杯卡普奇诺咖啡并没有什么两样。一个网络服务器有许多处理HTTP请求和生成响应的线程。你自然不想让这些宝贵的线程们在等待一条数据库查询或者调用另一个HTTP服务的时候被阻塞。为此,你转而使用非同步编程模型和非阻塞型的IO,使得服务器在处理数据库查询请求并返回结果的等待过程中,服务器线程依然可以同时处理其他的请求。

我听说你喜欢回调函数,那我就在你的回调函数里放进我的回调函数!

显然,你已经听说过回调函数的各种问题 - 这正是Node.js被许多很cool的同学所诟病的地方。Node.js还有一些其他的库大量的使用回调函数同其他服务进行交互。但遗憾的是,这种策略很容易产生在回调函数中调用回调函数所引发的一团混乱,使得代码阅读和除虫变得十分困难。

接下来,你马上就会看到Scala的Future类也允许回调函数,而且Scala还提供了一些更方便的替代方案。靠它们,你很可能会在之后抛弃回调函数。

我知道Futures,它们根本就是毫无用处吗!

你也许已经了解了其他的语言中的Future实现,比如Java。但是Java所提供的Future类并没有提供太多有用的功能:检查它们是否完成,或者等待它们完成后做一些事情。简而言之,Java的Future类几乎就是毫无用处的,也就导致没有人喜欢用他们。

如果你觉得Scala的Future也像是Java一样的实现,那就准备好大吃一惊吧。黑喂狗!


Future的语义

Scala的Future[T]类属于scala.concurrent包。Future[T] 是一种容器类型,表示为一个最终输出类型为 T 的计算过程。额,计算过程中也许会出错或者超时 - 如果Future完成时,他的结果有可能根本没有执行成功,这时候结果所包含的就是一个异常对象。

Future 是一个一次性写入的容器。当一个future执行完成后,这个容器是不可更改的。而且,Future 只提供了一个可以读取计算结果的接口。把计算结果写入一个future的任务是通过 Promise 对象完成的。因此,这两个类在API设计中有着清晰的区分。在这篇文章中,我们着重介绍前者(Future)。Promise类会在下一篇系列教程中介绍。


编写Futures

使用Scala futures有多种编写方式,接下来我们会用Future类重写之前的卡普奇诺咖啡例子来展示它们。首先,把所有函数改写成返回含有阻塞计算过程的Future对象的函数,使得这些函数可以并发执行。

import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
  println("start grinding...")
  Thread.sleep(Random.nextInt(2000))
  if (beans == "baked beans") throw GrindingException("are you joking?")
  println("finished grinding...")
  s"ground coffee of $beans"
}

def heatWater(water: Water): Future[Water] = Future {
  println("heating the water now")
  Thread.sleep(Random.nextInt(2000))
  println("hot, it's hot!")
  water.copy(temperature = 85)
}

def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
  println("milk frothing system engaged!")
  Thread.sleep(Random.nextInt(2000))
  println("shutting down milk frothing system")
  s"frothed $milk"
}

def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
  println("happy brewing :)")
  Thread.sleep(Random.nextInt(2000))
  println("it's brewed!")
  "espresso"
}

这段代码有几个地方需要进一步解释。

首先,Future类在它的伴生对象中有一个两个柯里化参数的工厂方法 – apply:

object Future {
  def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}

非同步执行的计算过程被作为 按变量名 参数传递给第一个参数。第二个参数的类型为 implicit,意味着只要我们在上下文中的某处定义了一个 implicit 值后,这个参数会自动匹配到这个值。一般来说,只要保证导入了全局执行上下文对象(import ExecutionContext.Implicits.global)就可以了。

一个 ExecutionContext 对象是一个执行future的上下文,你可以把它看做成一个类似于线程池的东西。上面说定义了一个隐含的 ExecutionContext 对象,剩下来我们只要传入第一个参数就可以了。人们通常会把第一个参数可以被花括号{}包裹而不是圆括号()。这样做得原因是,花括号使得代码看起来更像是在使用Scala的语言特性,而不是调用一个常规的方法。对于所有 Future 的API来说,所有接口都有一个 implicit 类型的 ExecutionContext 参数。

另外,在上面的例子里,我们实际上没有真的计算任何东西。为了展示目的,我们在“计算”过程里加入了一些随机时长的sleep。再加入了一些打印命令,这样你在跑上面的代码的时候可以更清晰感受一下这段代码实际运算时的非确定性和并发特征。

返回的 Future 对象内的计算过程会在 Future 对象实例化之后的某个不确定的时间,放入到由 ExecutionContext 所管理的某个线程内执行。

回调函数

当执行简单任务的时候,使用回调函数完全合适的。future中的回调函数的类型是部分函数(partial function)。你可以把一个回调函数传入到future的 onSuccess 方法内。当且仅当 Future 成功执行完成后,计算好的数值才会作为参数传入回调函数:

grind("arabica beans").onSuccess { case ground =>
  println("okay, got my ground coffee")
}

类似的,你也可以注册一个失败时调用的回调函数注册到 onFailure 方法内。这个回调函数的参数是一个 Throwable 对象,它只会在 Future 执行失败的时候会被调用到。

通常来说,最好的方式是整合两个回调函数然后把它们注册到onComplete中。这样的话,回调函数的参数类型为 Try

import scala.util.{Success, Failure}
grind("baked beans").onComplete {
  case Success(ground) => println(s"got my $ground")
  case Failure(ex) => println("This grinder needs a replacement, seriously!")
}

因为在上面的代码里,传入 grind 函数的值是”焗豆”“(而不是咖啡豆),grind 将在执行时抛出异常,导致 Future 的结果为一个 Failure 对象。

组合future对象

以嵌套的方式使用回调函数会引发各种令人头疼的问题。幸运的是,在Scala中我们完全可以不必这样做!发挥Scala future 真正实力的特性是因为future对象是可以组合的

如果你阅读过这个系列教程的前面的文章,你会注意到 Future 容器可以使用 mapflatMapfilter 或者 for 推导式(笔者:也就是说是一个完备的Monad模式)。 因此,Scala可以做到接下来所说的特性实际上不会让你惊讶才对!

留给我们现在的问题是:既然我们可以使用容器的那些操作方法,对于还没有执行完成的 Future 类来说这些方法意味着什么呢?

映射未来

你是不是总会幻想自己是一个可以改变未来事件的时间旅行者呢?作为一个Scala开发者,你真的可以办得到!假设当你的水烧开时,你想检查水温是否达到要求,你可以把你的 Future[Water] 对象映射成一个 Future[Boolean] 对象:

val temperatureOkay: Future[Boolean] = heatWater(Water(25)).map { water =>
  println("we're in the future!")
  (80 to 85).contains(water.temperature)
}

定值 temperatureOkay 被赋予了一个 Future[Boolean] 对象。这个对象在结果成功地计算完成后,会含有一个布尔值。你可以试着把 heatWater 方法实现修改一下让它抛出异常(比方说烧水器炸掉了),然后看一下这会让未来发生什么样的变化: console里面没有打印出 we're in the future!

当你把写好的函数通过 map 方法传入future对象后,这意味着这个函数将在未来,或者说是某一种未来的状态中被调用。映射函数(mapping)在你定义的 Future 对象成功完成后立刻执行;但在映射触发时的那个时间线并不是你现在所处的那一个。如果你的 Future[Water] 挂掉了,你通过 map 传入的函数将永远不会被触发,取而代之的,它会返回一个包含了 Failure 结果的一个 Future[Boolean] 对象。

保持扁平化的未来

如果一个 Future 对象的结果依赖于另外一个 Future 对象,你也许转用 flatMap 方法来避免深层嵌套的future结构。

比方说,我们假设测量温度的步骤会需要一段时间来完成,因此你也想实现一个非同步检查温度的方法。这样,你写了一个 Water 对象作为参数,Future[Boolean] 作为返回值的方法:

def temperatureOkay(water: Water): Future[Boolean] = Future {
  (80 to 85).contains(water.temperature)
}

调用 flatMap 而不是 map,使得我们得到一个 Future[Boolean] 而不是 Future[Future[Boolean]]

val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)).map {
  water => temperatureOkay(water)
}
val flatFuture: Future[Boolean] = heatWater(Water(25)).flatMap {
  water => temperatureOkay(water)
}

同样的,映射函数只有在 Future[Water] 实例在成功执行后才会被调用(在水温可以接受时)。

for 推导式

flatMap 调用之外,我们还可以使用 for 推导式来实现本质相同但阅读更清晰的代码。我们上面的例子可以重写成:

val acceptable: Future[Boolean] = for {
  heatedWater <- heatWater(Water(25))
  okay <- temperatureOkay(heatedWater)
} yield okay

如下,假设你有多个可以并发执行的计算过程,注意,以下代码中我们在 for 推导式内实例化 Future 对象:

def prepareCappuccinoSequentially(): Future[Cappuccino] = {
  for {
    ground <- grind("arabica beans")
    water <- heatWater(Water(20))
    foam <- frothMilk("milk")
    espresso <- brew(ground, water)
  } yield combine(espresso, foam)
}

这段代码读起来很棒,但由于 for 推导式是 flatMap 的另一种表现方式,这意味着 heatWater 中创建的 Future[Water] 对象只有在 Future[GroundCoffee] 成功完成后才会实例化(笔者注:这样会导致我们所创建的future计算过程仍然按照顺序执行,而不是非同步执行),你可以运行一下上面的代码并监控控制台的输出,检查一下是否是按照固定顺序执行。

因此,为了并发正确触发,请保证所有的future对象在 for 推导式之前已经被实例化:

def prepareCappuccino(): Future[Cappuccino] = {
  val groundCoffee = grind("arabica beans")
  val heatedWater = heatWater(Water(20))
  val frothedMilk = frothMilk("milk")
  for {
    ground <- groundCoffee
    water <- heatedWater
    foam <- frothedMilk
    espresso <- brew(ground, water)
  } yield combine(espresso, foam)
}

现在,我们在 for 推导式之前创建的 Future 会在创建后立即同步执行。通过观察控制台,你会发现多次运行结果的顺序是不确定的。唯一可以认定的事情是,”happy brewing”总是在输出的最后一行出现,因为只有它是在我们的 for 推导式之内所创建出来的。也就是说,只有当三个for外面的future完全执行完成后,combine 才会在最后执行。

失败的投影

你已经知道了 Future[T] 是偏向于成功的(success-biased)。基于计算过程正确完成这个假设,这使得允许你可以自由使用 mapflatMapfilter 以及其他 Future 的相关接口。有时候,你想对计算结果中可能失败的地方以优美的函数式编程的方式进行一些特别的处理。通过调用Future[T]failed方法,你可以得到一个失败结果的投影,也就是 Future[Throwable],然后通过调用它的 map 方法传入例如只接受失败结果的函数。


前景

你已经看到了未来,而且他看起来还不赖!事实上你完全可以把它看做成另一种普通的容器类型。通过组合,以函数编程的方式,使用 Future 会是一段良好的体验。

编写并发执行的阻塞代码很简单,只需把需要执行的阻塞代码通过future的工厂方法创建新的 Future 对象。但是,最好从开始就不要把代码设计成会有阻塞的代码。为了实现它,我们必须通过使用 Promise 类来完成一个 Future 对象。实际使用future对象的方式会在此系列的下一部分中介绍。

New path ahead!

莫名其妙的转行到Online Advertising行业,前两年的工作经验积累在目前的岗位上几乎完全用不上了。 之前所做主要与图像/视频算法的实现有关系,侧重于基于显卡的GPGPU异构计算;取而代之,工作重心转移到了企业级Java应用。

于是,关于OpenCV/OpenCL的内容,可能就要无限期的暂停了。接下来,也许随着经验的积累,我会写点跟目前更有紧密联系的内容。

学习IOS7开发ING

最近这段时间我开始从零单排学起IOS开发。学习过程中,着实感觉到苹果公司以及社区的学习资料对于初学者的友好。 甚至于苹果公司自己提供的技术文档都散发着苹果公司自身的美学气息。这是我在学习其他编程语言或技术所没感受到的。 因为长期以来,程序员(尤其是开源社区)的风格基本就是自带文档少的可怜或者资料残缺过时根本没用处。基本上来说,对陌生的技术/开源源码,基本上是不能指望开发者的文档的,只能靠自己阅读代码去理解。

在这里,为了方便后来者,我整理了一下我在IOS开发过程中参阅学习的资料:

  1. 斯坦福IOS7在线课程 via Itunes U。我的最初和主要的学习资料。这门课程内容十分丰富,每节1小时的1080P全程课堂视频,完整的课堂笔记和作业,极为专业的教授,而且免注册。推荐认真学习。

  2. 马上着手开发 iOS 应用程序 入门级的IOS学习资料,但内容跨度很大,覆盖了几乎完整的IOS应用的开发流程。

  3. iOS 用户界面指南规范 规范内容并不枯燥乏味,反而由于大量图片/视频实例,大大增强了文档的可读性。从这里可以学习到很多苹果公司(Jonathan Ive)对于IOS7方方面面设计细节,让人折服于苹果对于用户界面和体验的深刻体会,可谓苹果粉的圣经,推荐阅读。

  4. 使用 Objective-C 编程 为了编写IOS程序,学习Objective-C是必不可少的。

  5. 以后再加。。。(xp你不觉得这篇很水吗?