猴年新气象

从去年11月份左右从后端正式转前端,到现在2月份,这三个月左右的时间里自我感觉是对自己而言是职业生涯中知识增长最快的几个月。 相比较偏向于成熟稳重的后端世界,前端技术不断的吸收着其他语言的特性,使得前端对于工程师的要求变得越来越高的同时,也变的更加“有趣”。

我公司目前整个应用平台,包含前端和后端,都进行了一次彻底的重做和技术的更新迭代。 对于前端而言,由于启用了大量的新技术,不光是前端初来乍到的我,而且对整个前端团队来说都是一次猛烈的学习风暴,最重要的有:

  • Gulp 替换 Grunt 做为新的构建工具
  • 使用 Babel 转译 ES6 代码
  • 使用 Angular 1.x 框架编写组件化的单页应用
  • 加入 NodeJs 层作为前端与后端之间的一个前端团队可控的中间件服务
  • 使用 Flexbox 实现更灵活的页面布局

民工徐飞老师的一篇 未来Web应用的前端技术选型畅想 对目前技术的发展有很深刻的讨论:

2015年,前端的世界发生了很多变化,这些变化快得超出我想象。在这个巨变的时代,产品的技术选型是个麻烦的事情,具体来说,有几个方面:

- 如果2-3年后新开始一个业务项目,可能会有什么样的技术方案?
- 如果现在立刻开始一个新业务项目,可能会有什么样的技术方案?
- 如果持续维护老的项目,后面可能会对它们有怎样的迁移方案?是逐步迁移,还是推倒重做?
- 在PC端项目为主体的业务体系里,如果将来某个时机出现了移动端项目,该如何去选型,并且利用之前的业务代码

我之前没有预料到的,是ES6的普及之快。
在此之前,对于新的语言特性,人们一般会等到支持的浏览器普及之后,才会大量使用,比如ES5,但由于Babel这样转译的工具出现,我们可以渐进使用,
所以,开发过程中可以完全使用ES6甚至ES7的特性编写代码,然后通过构建去达到兼容的结果。

近阶段本人的目标还是在以阅读为主。自己干货还是少了一些,等有足够料的时候再具体聊聊。


推荐深入阅读:

ES7 async/await 使用剖析

简介

async/await 特性目前做为TC39的一个提议,由微软公司提出,预计加入下一个版本的 ECMAScript 中。 在浏览器大规模支持新的ES版本前 (也许永远不会跟上脚步), 我们可以使用流行的 babeljs 转译工具提前享用最新的语言特性。

如果你想试一下 babeljs 简单的测试环境并尝试一下这个语法,不妨 clone 一下我的这个项目: babel-playground

async/await 的灵感来自于 C# (wiki), 在某些其他语言 (比如 Scala, Python) 也可以发现他的身影。 此特性依赖于新加入ES6 的 generator 和 Promise 特性。 开发者可以组合这两个特性,以类似于同步的方式写出优美的非同步的代码。

这篇文章将以 async/await 的转译过程开始介绍这个特性的实现原理,并在之后列举几个在开发中经常碰到的问题。

TL;DR

async 函数是一个返回值为 Promise 的函数。借助生成器我们可以不用回调函数嵌套的方式编写非同步代码。

async/await 转译过程

async/await 这个便利的特性背后有着一个精巧的实现原理。 我的建议是,为了更有信心的使用这个特性,我们有必要简单了解一下 async/await 是如何转译、工作的。

async/await 本身可以看作 Promise + generator 的语法糖。 它的关键在于 spawn 函数。 Reference: Informative Desugaring:

function spawn(genF, self) {
    return new Promise(function(resolve, reject) {
        var gen = genF.call(self);
        function step(next) {
            try {
                next = nextF();
            } catch(e) {
                // finished with failure, reject the promise
                reject(e);
                return;
            }
            if (next.done) {
                // finished with success, resolve the promise
                resolve(next.value);
                return;
            } else {
              // not finished, chain off the yielded promise and `step` again
              Promise.resolve(next.value).then(function(v) {
                // resolve
                step(gen.next(v));
              }, function(e) {
                // reject
                step(gen.throw(e));
              });
            }
        }
        step(gen.next());
    })
});

这个函数代码可能初看不好理解。总的来说,函数分为几个部分:

  1. 调用 ES6 生成器函数 genF,获得一个生成器对象 gen
  2. 调用 gen.next() 方法。
  3. 当生成器未执行完毕时,这次的运行结果为一个 Promise 对象。 将其结果返回给生成器,使得生成器能够拿到 Promise 的最终结果,并把新的生成器放入下一次递归中。
  4. 当生成器执行完毕时,把最后的结果放入 resovle 函数的调用中。

值得注意的是,为了防止生成器 yield 的值不是一个 Promise,上面的代码中我们把这个值放入 Promise.resolve(next.value) 中,以保持代码的连贯性。这一点在文章稍后也有提到。

实际调用例子:

spawn(function* () {
  const res0 = yield Promise.resolve(123);
  const res1 = yield Promise.resolve(res0 + 456);
  return res1; // should be 123 + 456
});

上面的代码使用 async/await 语法糖的写法:

async function myAsyncFunction() {
  const res0 = await Promise.resolve(123);
  const res1 = await Promise.resolve(res0 + 456);
  return res1; // should be 123 + 456
}

// 而不用async/await的写法是...
function myAsyncFunction2() {
  function p1 = function(res) {
    return Promise.resolve(res + 456);
  }

  Promise.resolve(123).then(value => {
    return p1(value);
  });

  // 注意,此函数的返回类型为 Promise(Promise),但 Promise 对象在调用 Promise.resolve 方法时
  // 会追随给定的 Promise 执行到 resolve/reject 为止。因此在使用上和 myAsyncFunction 是等效的。
}

上面的例子比较简单,不过还是能大概的看出使用 async/await 后代码显得更清晰易懂。

总的来讲,async/await 语法就是把 async 函数替换成一个生成器函数,并把函数内的 await 替换为 yield,再将函数作为参数传入 spawn 函数中。也就是说,隐含的转移过程为:

async function <name>?<argumentlist><body>
=>
function <name>?<argumentlist>{ return spawn(function*() <body>, this); }

async 函数的大体转移原理如上,不过实际的转译工具对于生成器也有额外的转译过程(比如用循环迭代代替递归)和运行时需要添加的依赖。 比如 babel 使用 facebook 的 regenerater runtime,这里不多做赘述。


PITFALLS!

实际项目使用中,本人由于刚上手时不熟悉 async/await,踩到很多坑。 这里列一下我总结的使用这个特性时需要注意的点:

警惕 “Race condition”

使用 async/await 的一个常见错误是把 async 函数内的调用看作是同步的。 实际情况是,在每个 await 开始到下一个 await 之前,程序的执行才是同步的。 这一点在 async 函数内部修改外部的状态时需要格外小心。比如下面的例子:


async function foo(uri) {
  // 请求某一资源
  const res = await $.ajax(uri);
  // 用res做一些其他事情
  generateView(res);
}

// 由用户操作触发的两次连续foo调用
foo(slowURI); // 返回较慢
foo(fastURI); // 返回较快

// 视图最终被渲染为 slowURI 的内容

这是我开发过程中经常碰到的一个问题: 如果第一次的调用返回比较慢,就会将视图渲染为第一次的结果,然而这显然不是我们想要的。

比较简单的解决方案是:

let lastRequestedURI = null;

async function foo(uri) {
  lastRequestedURI = uri;
  // 请求某一资源
  const res = await $.ajax(uri);
  // 当全局的请求URI跟这次的请求一致时才重新渲染视图
  if (lastRequestedURI === uri) {
    // 用res做一些其他事情
    generateView(res);
  }
}

// 由用户操作触发的两次连续foo调用
foo(slowURI); // 返回较慢
foo(fastURI); // 返回较快

// 视图最终被渲染为 fastURI 的内容

正确处理 async 函数中的异常

根据转译过程的代码看出,想要让 async 函数中的 Promise 抛出的异常正常的被捕捉,一定要在调用 Promise 的时候与 await 组合。

如下面的代码所示:

function testExceptions() {
  // 在 Node.js 中处理未捕捉的 Promise 异常
  process.on('unhandledRejection', (reason, p) => {
    console.log("Unhandled Rejection - " + reason + " reason");
  });

  async function foo() {
    throw 'some error';
  }

  // 异常未被捕获
  try {
    foo();
  } catch(e) {
    console.log("normal try/catch - " + e);
  }

  // 异常未被捕获
  (async () => {
    try {
      foo();
    } catch(e) {
      console.log("async try/catch without await - " + e);
    }
  })();

  // 异常被捕获
  (async () => {
    try {
      await foo();
    } catch(e) {
      console.log("async try/catch with await - " + e);
    }
  })();
}
// 执行结果:
// async try/catch with await - some error
// Unhandled Rejection - some error reason
// Unhandled Rejection - some error reason

谨记一点:async 函数中的 Promise 如果不是跟 await 组合,那么他的返回值还是一个 Promise。 开发过程中请谨慎对待 “游离状态” - 也就是没有外部引用的 - Promise。 虽然对 NodeJs 来说,可以通过对于 processunhandledRejection 事件进行监听来处理没有被处理的 rejected Promise。 但这种用法会失去对于 Promise 执行状态的追踪,使得代码的容错水平降低。

await 一个非 Promise 值

例子:

function bar() {
  return Math.random() * 2 > 1 ? 123; somePromise;
}

async function foo() {
  await bar();
}

在之前的转译过程解释中有提到,await 的值实际会被包裹在一个 Promise.resolve() 中。 因此上面的代码可以正常工作。

注意

Promise.resolve(somePromise) 等效于 somePromise。

Promise.resolve: Returns a Promise object that is resolved with the given value. If the value is a thenable (i.e. has a then method), the returned promise will “follow” that thenable, adopting its eventual state; otherwise the returned promise will be fulfilled with the value. Generally, if you want to know if a value is a promise or not - Promise.resolve(value) it instead and work with the return value as a promise.

类似的,下面的代码也可正常工作:

async function foo() {
  return somePromise; // 没有调用 await
}

并行(parallel)执行多个 async 函数

利用 async 函数我们可以方便的顺序执行 Promise。比如下面的例子:

// 返回一个在 milliseconds 毫秒后完成的一个 Promise
function delay(milliseconds, index) {
  return new Promise(res => {
    setTimeout(() => {
      res(`[${index}]: Res after ${milliseconds} milliseconds`);
    }, milliseconds);
  });
}

// 顺序执行
async function sequence() {
  const start = new Date();
  for (let i = 0; i < 10; i ++) {
    console.log(`${await delay(Math.random() * 1000, i)}`);
  }
  console.log(`sequence done in ${new Date() - start} ms`);
}

/* 输出
[0]: Res after 986.4941246341914 milliseconds
[1]: Res after 333.2838623318821 milliseconds
[2]: Res after 354.3416520114988 milliseconds
[3]: Res after 834.6803605090827 milliseconds
[4]: Res after 215.9734272863716 milliseconds
[5]: Res after 221.03742230683565 milliseconds
[6]: Res after 114.20689150691032 milliseconds
[7]: Res after 28.70347397401929 milliseconds
[8]: Res after 524.5535324793309 milliseconds
[9]: Res after 669.4943546317518 milliseconds
sequence done in 4318 ms
*/

然而开发中,我们经常需要并行执行多个 Promise。 比如典型的网络请求情形,一般来说我们需要同时发出多个网络请求,并在所有请求返回时进行下一步操作。 如果用上面的顺序执行方案的话,JavaScript 的非阻塞特性没有被充分利用。

如果不同的 Promise 之间并没有依赖,就可以用并行的方式执行他们。

async function parallel() {
  const delays = [];
  const start = new Date();
  for (let i = 0; i < 10; i ++) {
    delays.push(delay(Math.random() * 1000));
  }
  console.log(await Promise.all(delays));
  console.log(`parallel done in ${new Date() - start} ms`);
}

/*
[ '[0]: Res after 193.9734136685729 milliseconds',
  '[1]: Res after 323.2925720512867 milliseconds',
  '[2]: Res after 935.3614274878055 milliseconds',
  '[3]: Res after 422.59012744762003 milliseconds',
  '[4]: Res after 318.91681230627 milliseconds',
  '[5]: Res after 39.97510578483343 milliseconds',
  '[6]: Res after 616.469515254721 milliseconds',
  '[7]: Res after 696.0562060121447 milliseconds',
  '[8]: Res after 859.5389637630433 milliseconds',
  '[9]: Res after 473.90571935102344 milliseconds' ]
parallel done in 938 ms
*/

并行执行的诀窍在于 Promise.all 方法。 我们需要事先把需要并行的 Promise 放入一个数组内,然后传入这个方法。 当所有的 Promise 执行完毕后,all 会把所有 Promise 的结果按顺序放入最终结果的数组内。


参考资料

Live long and front-end, WTF?

时间过的很快,在 Marin Software / Shanghai 已经呆了有一年半了。 xp自认为经过这段时间的洗礼,已缓慢入门了当今最为复杂的行业之一的 IT 互联网。 好在 Marin 的工作压力不大,与在国内竞争异常激烈的同行们相比,我司在工作之外有充足的休息时间,使得我得以有精力控制并享受自己的生活。

每次提笔更新博客的契机都可以算作心血来潮。 这次呢是因为发生在自己身上这段时间的各式变化,想想看是有个积极的方面的变化可以说一说的。

那么,就开始进行新一轮生活里程的总结吧。

Get fit! 奔跑吧大兄弟!

过去的一年中,最值得自豪的事情是,我慢慢从一个不进行任何健身活着体育活动的大胖纸,变成了一个能跑个半马的大胖纸。 关心我的朋友们,与我聊天时会好奇地问起我健身减肥的理由, “不减肥找不到媳妇”的观点是最容易被提起,而我也乐于附和,不管怎样这都是实话 😄。 为了让回头来看这篇文章的自己能感到骄傲,我会进行一下艺术加工 :D。

作为一个年岁近三十的大胖子,肥胖史几乎伴随了我人生的整个阶段。 坦白讲,减肥的想法时常会从我脑中蹦出来,但行动却是从来没有真正付诸于实践过。 肥胖这个属性真的会给人带来很重的负面效应,比如在生理上会增加各种可怕疾病的发病率,精神上容易疲劳,甚至还会使得性格变的容易自卑。 还有什么比一个孤独的一事无成的宅男胖子更可悲的呢?

幸运的是,在一切变的更加恶化之前,一个寻常的夜晚,我选择了离开电脑前,穿上运动装,走出家门,开始了持续至今的世纪公园溜达日常。

最初,我的体力只能支撑自己快走。 世纪公园一圈正好是五公里,每天快走一圈,坚持了一段时间之后,感觉到体力慢慢在增长, 我渐渐体会到掌控自身是种怎样的感觉,这使得我鼓起勇气去挑战慢跑。

开始跑步的那段时间是对心理和身体的双重考验。 说来惭愧,我的基础弱到了极致,所跑过最远的距离是在中学体育考试的一公里跑,更别提其中有段坚持不下来,换成走才完成的。

拖着沉重双腿,挺着喘到感觉窒息的上肢,我进入了慢跑训练最初也是最难的一个阶段。

依稀记得开始慢跑时,正是寒冷的一月份的上海。 从零基础慢跑,到能坚持用半小时时间跑完五公里,我并没有注意到自己经历过了多少个孤独的夜晚。 意识到的时候,已经是鸟语花香的五月了。

达成世纪公园慢跑一圈的目标后,我站在终点回头望去,心中是满满的自豪和兴奋。 我对自己说,还不够,再继续挑战更远的距离。

这时候不能不提一下影响我跑步的另一件事了。 八月某一天,S 告诉我和 L,上海国际马拉松马上就要进行报名,有没有兴趣参加。

我想了想,按自己的强度,或许在三个月后可以试着挑战一下半程马拉松。 于是,我选择了 Nike Running 应用的5公里高强度教练项目进行长度为8周的训练。 这个训练的强度对我来说还是相当有挑战的: 每周要慢跑5次,每次都在六公里以上。而此之前,我刚刚才能勉强完成每周两三次的五公里目标。

有计划的训练给我的跑步里程带来了质的提高,而每一次的提高都能让自己获得从未体验过的开心的感觉。 渐渐的,我能坚持跑完两圈世纪公园了(也就是10公里)。

9月份初,也就是训练项目的第六周是整个训练项目强度最大的一周,在周六这天要进行16公里的长跑。 跑之前我很担心会不会在跑步途中因为各种各样的身体原因跪倒在路上。 跑完第三圈时,我心想,特么都跑完15公里了,再坚持一圈不就半马里程了么!

最后的五公里的我身体虚脱:摆动着不像是自己的双臂,腿颤抖着驱赶着已麻木的小腿,机械的踏着一步又一步; 全身被榨干的汗水已经浸湿全身运动服,又完全蒸发掉; 大脑一片空白,耳中偶尔跳出 Nike Running 的历程时间提示。

幸运的是,我艰难但安全的在2小时17分完成了21公里的半马慢跑。 站在终点,我艰难的打开手机,点了结束按键。

“Congratulations, you have reached your goal of 21 kilometers.”

对一年前的我来说,这个目标根本就是遥不可及,甚至是不可想象的。 作为一个大胖子,我从没有奢望会有一天能体会到跑步的乐趣。

感谢每晚陪伴我的播客节目们:友的聊,大内密谈,机核网,狗熊有话说。 感谢微信运动的朋友们,每天让我这个大胖子排第一真是难为你们了。

遗憾的是随后的上马资格抽签我公司的三兄弟都没有抽中。但跑步活动不是还有很多,不是吗?

回头看自己在上学时候所留下的照片,真的是惨不忍睹啊~

最后捏他一下监狱学园的两句话:

为什么我要跑步?因为路就在那里。

喜欢跑步的没有坏人。

Path to fullstack?!

一天公司领导 M 问我,有没有兴趣转入做专职的前端工作。

前端技术发展的相当迅猛。不仅需要大量的经验,还需要持续性的学习,被虐的满头包啊。

国庆时在图书馆,埋头做了个用 Polymer 库做了个聊天室,感觉收益颇丰~

译文:处理Actor系统中的故障

原文链接 The Neophyte’s Guide to Scala Part 15: Dealing With Failure in Actor Systems

系列的前一篇文章中,我向你介绍了Scala语言中处理并发的第二块基石:行动者模式,其补充了基于组合future类的并发策略。 你学习了如何定义和创造行动者,如何向行动者们发送消息,行动者如何处理所接收到的消息以至于改变行动者的内部状态,还有如何回复给发送者消息。

希望你已经对行动者模式的并发策略产生了足够的兴趣。 如果你想开发一个完整的基于行动者的应用,仅仅使用简单的产生回声的行动者是不够的。 为了这个目的,还有不少关键性的概念需要你去学习。

行动者模型意味着它能帮助你实现很高程度的错误容忍度。 此篇文章,我们会看一下在一个基于行动者模式的应用中故障是如何被处理的。 你将会看到,它与传统的层级搭建的服务器架构的错误处理方式有着根本上的不同。

处理故障的解决方式与Akka的一些核心概念紧密相连,而其中某些还是搭建起Akka的重要的元素。 因此,这篇文章还作为这些核心概念和组件的指南。


行动者的层级结构

在了解当你的行动者内出现错误的时候会发生什么事情之前,你需要了解一个行动者并行系统的重要概念: 行动者系统是按照一定层级结构组织而成。

所以这个概念代表着什么意思呢?首先,它意味着你的每一个行动者都有一个parent(以下称为父行动者),每个行动者又可以创建他的子行动者。 基本上,你可以把一个行动者系统看成一个由行动者组成的金字塔。父行动者像是在实际生活中一样照看他的子女,并在子女跌倒的时候把他们扶起来。 你将会马上看到这是如何做到的。

守卫行动者

前一篇文章中我们创建过两种行动者,分别为 Barista 咖啡师和 Customer 客户。 这回我们换一种方式创建,并关注一下这些不同的行动这类型是如何创建出来的:

import akka.actor.ActorSystem
val system = ActorSystem("Coffeehouse")
val barista = system.actorOf(Props[Barista], "Barista")
val customer = system.actorOf(Props(classOf[Customer], barista), "Customer")

如上,我们通过调用 ActorSystemactorOf 方法创建出两个行动者。

所以这两个行动者的父亲是谁呢?是 Coffeehouse 这个行动者系统吗?嗯,这个答案虽然不是很准确,但也差不多了。 行动者系统并不是一种行动者,但是它被称为一个座位所有用户创建的根级行动者的父亲的守卫行动者。 所谓的用户创建的根级行动者,也就是通过调用行动者系统的 actorOf 创建出的行动者。

你的系统中不应该有很多由守卫行动者直接创建出的行动者。 更合理的情况是,系统内只应有少量的顶级行动者,并且他们会把大部分的任务委托给自己的子女们。

行动者的路径

The hierarchical structure of an actor system becomes apparent when looking at the actor paths of the actors you create. These are basically URLs by which actors can be addressed. You can get an actor’s path by calling path on its ActorRef:

如果你实际的观察你所创造的行动者的路径,行动者系统的层级结构会变得清晰起来: 他的路径基本上可以看做是以URL的形式所代表的一条地址。 你可以通过调用 ActorRefpath 方法获取到行动者的路径。

barista.path // => akka.actor.ActorPath = akka://Coffeehouse/user/Barista
customer.path // => akka.actor.ActorPath = akka://Coffeehouse/user/Customer

一个actor的路径是由: Akka 协议 akka://, 用户的守卫行动者 Coffeehouse, 再加上行动者的名字(也就是使用 actorOf 方法时用的那个)组成的。 对于运行在其他电脑上的远程的行动者来说,你也会在地址中看到远程主机名和他的端口号。

行动者的地址可以用来查询另一个行动者。 比如,我们可以在 Customer 行动者内调用它的 ActorContextactorSelection, 传入 Barista 的相对路径,获取到它。

context.actorSelection("../Barista")

虽然看似通过查找一个行动者的路径来获取引用看起来有些用处, 但大部分时候通过构造函数的参数传入依赖的行动者的引用是更好地方式,就像我们一直以来所做的。 过于密切关注行动者依赖在系统内的位置会更容易导致bug发生,并且让你的代码变得难以重构。

一个层级结构的例子

为了展示父行动者们如何监视着他们的子行动者,还有这种结构如何保证系统的容错性,我会在接下来继续关注我们的咖啡厅。 现在我们赋予 Barista 一个子行动者,使得咖啡师能将运营咖啡厅的一些业务委托给他的子行动者执行。

如果按实际咖啡师的工作中定义 Barista 模型,我们会为他的子任务创建一堆的子行动者。 但为了保证这篇文章的专注性,我们稍微为下面的例子做了点简化。

假设 barista 有一个收银机,这个收银机可以处理交易、打印收据、计算每天的销售额等事务。 下面是我们的第一个版本:

import akka.actor._
object Register {
  sealed trait Article
  case object Espresso extends Article
  case object Cappuccino extends Article
  case class Transaction(article: Article)
}
class Register extends Actor {
  import Register._
  import Barista._
  var revenue = 0
  val prices = Map[Article, Int](Espresso -> 150, Cappuccino -> 250)
  def receive = {
    case Transaction(article) =>
      val price = prices(article)
      sender ! createReceipt(price)
      revenue += price
  }
  def createReceipt(price: Int): Receipt = Receipt(price)
}

这个行动者包含了一个不可变的价目表,还有一个代表着销售额的整数类型的变量。 当他接受了一个Transaction 消息后,他会相应的增加销售额变量的值,并返回一个可供打印的 Receipt 收据消息。

就像之前提到过的,这个 Register 理应是作为咖啡师的一个子行动者存在的,因此我们应该在咖啡师行动者内创建它,而不是通过行动者系统。 我们第一个成为人父的行动者是这样的:

object Barista {
  case object EspressoRequest
  case object ClosingTime
  case class EspressoCup(state: EspressoCup.State)
  object EspressoCup {
    sealed trait State
    case object Clean extends State
    case object Filled extends State
    case object Dirty extends State
  }
  case class Receipt(amount: Int)
}
class Barista extends Actor {
  import Barista._
  import Register._
  import EspressoCup._
  import context.dispatcher
  import akka.util.Timeout
  import akka.pattern.ask
  import akka.pattern.pipe
  import concurrent.duration._

  implicit val timeout = Timeout(4.seconds)
  val register = context.actorOf(Props[Register], "Register")
  def receive = {
    case EspressoRequest =>
      val receipt = register ? Transaction(Espresso)
      receipt.map((EspressoCup(Filled), _)).pipeTo(sender)
    case ClosingTime => context.stop(self)
  }
}

首先,我们定义了 Barista 行动者所需要处理的消息类型。 一个意式咖啡杯 EspressoCup 有一个通过 sealed trait 实现的不可变的状态。

更令人感兴趣的部分是在 Barista 类中。 为了使用Akka 的询问语句而且让他返回future变量,我们需要导入dispatcheraskpipe ,并定义一个 隐式的 timeout 值: 当咖啡师接收到一条 EspressoRequest 浓缩咖啡订单后,我们以询问的方式发送一个 Transaction 交易消息给 Register 收银机,并等待其返回一个 Receipt 收据消息。 获取得的收据随后会和一杯倒满了咖啡的的咖啡杯一起以一个元组 (EspressoCup, Receipt) 的形式输送给 EspressoRequest 的发送者(也就是顾客)。 这种委托子任务给子行动者、并把他们的任务整合或修整的策略,正是典型的基于行动者的应用的处理方式。

并且,请注意我们如何通过调用 ActorContext 而不是用 ActorSystemactorOf 方法创建子行动者。 通过这样做,我们创建的行动者成为了调用者的子行动者,避免其成为守卫行动者子女的顶级的行动者。

最后,下面是 Customer 行动者的定义。像是 Barista ,他也是顶级的、仅处于守卫行动者之下的行动者:

object Customer {
  case object CaffeineWithdrawalWarning
}
class Customer(coffeeSource: ActorRef) extends Actor with ActorLogging {
  import Customer._
  import Barista._
  import EspressoCup._
  def receive = {
    case CaffeineWithdrawalWarning => coffeeSource ! EspressoRequest
    case (EspressoCup(Filled), Receipt(amount)) =>
      log.info(s"yay, caffeine for ${self}!")
  }
}

上面的代码中,我们首次使用了 ActorLogging,它允许我们把信息打印到日志类中,而不是到控制台。

现在,如果我们创建一个行动者系统,并在其中填入一个咖啡师和两个顾客,我们就可以开心的从这两个需要解决咖啡瘾的顾客中榨取黑金了:

import Customer._
val system = ActorSystem("Coffeehouse")
val barista = system.actorOf(Props[Barista], "Barista")
val customerJohnny = system.actorOf(Props(classOf[Customer], barista), "Johnny")
val customerAlina = system.actorOf(Props(classOf[Customer], barista), "Alina")
customerJohnny ! CaffeineWithdrawalWarning
customerAlina ! CaffeineWithdrawalWarning

如果你试着运行上面的代码,你可以看到这由满意的顾客们产生的两条日志记录。


To crash or not to crash?

当然,这篇文章的主题不是关于满意的顾客,而是糟糕的事情发生的时我们应如何应对。

假设我们的收银机是个脆弱的设备 - 它的打印功能不是很靠得住。 时常的,小票纸会卡住机器,导致打印失败。 我们在收银机的伴生对象里加入一个 PaperJamException 卡纸异常:

class PaperJamException(msg: String) extends Exception(msg)

然后,我们相应的改变 createReceipt 生成收据方法:

def createReceipt(price: Int): Receipt = {
  import util.Random
  if (Random.nextBoolean())
    throw new PaperJamException("OMG, not again!")
  Receipt(price)
}

现在,当处理 Transaction 交易消息时,收银机会以50%的几率抛出一个 PaperJamException 异常。

这会怎样影响我们的行动者系统乃至整个应用呢? 幸运的是,Akka是个很强健的系统,而且不会受我们代码中的异常影响。 当异常出现时,产生异常的子行动者的父行动者会接到通知 - 还记得我们之前提到过,父行动者会监视它的子行动者们吗? 这个时候,就是由父行动者决定应采取什么样的措施来处理子行动者的异常了。

监护人的异常处理策略

当接收到子行动者产生了异常的通知时,父行动者不是在 onReceive 方法中处理子行动者的失败行为的,因为这会混淆父行动者自己的正常处理逻辑。 就是说,处理自身的正常消息的逻辑和处理子行动者失败行为的逻辑是完全分开的。

每一个行动者都可以定义一个他自己的 监护人策略。它向Akka声明了当子行动者出现某种异常出现时,父行动者应该如何处理。

基本上来说,我们会使用两种监护人策略:OneForOneStrategyAllForOneStrategy。 选择前者,意味着处理一个子行动者时只会影响到这一个子行动者,反之就会影响所有的子行动者。 使用哪种策略应由你的实际使用情况决定。

在选择使用哪种 SupervisorStrategy 策略以外,你还需要给你的行动者指明一个 Decider (PartialFunction[Throwable, Directive] 的别名)。 定义它你可以为每种异常决定一个或所有的子行动者出现异常时需要做一些什么。

基本指令

下面是可供选择的基本指令:

sealed trait Directive
case object Resume extends Directive
case object Restart extends Directive
case object Stop extends Directive
case object Escalate extends Directive
  • Resume:如果你选择了继续, 也许就意味着你认为你的子行动者过于鸡婆,觉得她们抛出的异常可以忽略。 子行动者们这时就会的继续处理异常,就像是什么也没发生过一样。

  • Restart:重启指令会使得Akka为你创建一个或者多个新的子行动者。 这样做的一个原因之一是你假设了你的子行动者们会在抛出异常时,内部的状态就已经不稳定了,而且不能够继续处理更多的信息。 通过重启行动者,你希望会使得行动者重新进入一个干净的运行状态。

  • Stop:直接杀死行动者,他们就不能被重启了。

  • Escalate: 如果你选择了升级(指把错误递交给父行动者去处理),也许这一意味着这个行动者不知道如何去处理子行动者的异常。 通过把异常传给上级,你把处理异常的决定委托给了他的上一级的父行动者,并祈祷他比你更擅长处理它。 不过这样做后,行动者自己也许会被他的上级重启,因为行动者们只知道如何重启他的子行动者,而不能直接重启隔了两代的行动者。

默认策略

你不必在每个行动者里指明一个监护人策略。 实际上,我们到目前为止都没主动的这样做过。 这意味着默认的监护人策略在起作用,像是这样:

final val defaultStrategy: SupervisorStrategy = {
  def defaultDecider: Decider = {
    case _: ActorInitializationException => Stop
    case _: ActorKilledException         => Stop
    case _: Exception                    => Restart
  }
  OneForOneStrategy()(defaultDecider)
}

这意味着,除了 ActorInitializationExceptionActorKilledException,抛出其他异常的子行动者会被自动重启。

因此,当 PaperJamException 异常发生时,由于我们没有指定监护人策略,根据收银机的父行动者(也就是咖啡师)的默认策略,收银机会自动被重启。

如果你试过运行代码,你会在日志中发现一个异常的对战追踪信息,但收银机被重启的消息并没有出现在日志中。

为了验证一下到底发生了什么,我们先来学习一下行动者的生命周期。

行动者的生命周期

为了理解监护人策略的每种指令,我们需要了解一点行动者生命周期的一些知识。 基本上,可以被分解为如下: 当行动者通过 actorOf 方法创建后,行动者开始运作; 他可以在错误出现时被重启任意次; 最后当行动者被停止是,也就意味着他迎来了它的死亡。

一个行动者生命周期有多个方法可以被重载,并且了解他们的默认实现也很重要。 让我们简略的过一遍这几个方法:

  • preStart: 预开始阶段,会在行动者 start 即将启动前被调用,允许你做一些初始化逻辑。默认实现为空。
  • postStop: 后停止阶段,在 stop 停止方法被调用后被调用,允许你做一些资源清理工作。默认实现为空。
  • preRestart: 预重启阶段,会在一个崩溃的行动者 restart 即将重启前被调用。 默认实现中,此方法会停掉所有的他的子行动者,并调用 postStop 方法以清理资源。
  • postRestart: 后重启阶段,会在行动者刚刚重启完成后被调用。默认实现为调用 preStart 方法。

通过在 postRestart 方法中加入一些日志输出信息,让我们看一下我们的收银机是否真的在出现错误时被重启了。 给 Register 加入对于 ActorLogging 的继承,并加上下面的方法:

override def postRestart(reason: Throwable) {
  super.postRestart(reason)
  log.info(s"Restarted because of ${reason.getMessage}")
}

现在,如果你给两个顾客行动者发送一堆 CaffeineWithdrawalWarning 消息,你会在日志中看到几条可以确定收银机有时被重启了的信息。

行动者的死亡

很多时候,不停地重启同一个行动者在道理上说不通。 比方说,一个行动者需要与网络上的服务进行交互,但服务器有时会在很长时间内没有作出应答。 在这种情况下,让Akka在一定时间内重启行动者是个好主意;超时后,行动者就会被停掉,也就让他赢来了死亡。 这个时间限制可以通过监护人策略的构造函数配置:

import scala.concurrent.duration._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Restart
OneForOneStrategy(10, 2.minutes) {
  case _ => Restart
}

可自愈的系统?

我们的系统是否能平稳的运行,并在卡纸的时候自我修复么? 让我们修改一下后重启阶段的日志输出:

override def postRestart(reason: Throwable) {
  super.postRestart(reason)
  log.info(s"Restarted, and revenue is $revenue cents")
}

再加入一点输出信息给 receive 函数,像是这样:

def receive = {
  case Transaction(article) =>
    val price = prices(article)
    sender ! createReceipt(price)
    revenue += price
    log.info(s"Revenue incremented to $revenue cents")
}

啊哦!好像有些东西没搞对。 在日志中,你会看到收入额会逐步提高,但只要收银机卡纸重启后,销售额就被重置为0. 这是因为重启一个行动者意味着之前的实例就被完全抛弃,以一个全新的通过 actorOf() 调用生成的行动者替代。

当然,我们可以改变监护人策略,让他在 PaperJamException 异常抛出时直接继续运行。 我们可以把下面的代码加进咖啡师的定义内:

val decider: PartialFunction[Throwable, Directive] = {
  case _: PaperJamException => Resume
}
override def supervisorStrategy: SupervisorStrategy =
  OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))

现在,收银机在卡纸后不会被重启,它的状态也就不会被重置了。

错误核心模式

这是否意味着我们有了一个保持收银机状态的好的解决方案了呢?

有时候,简单的恢复行动者的运行状态是最好的解决思路。 不过假设我们真的需要重启一个收银机,因为不重启也就意味着卡住的纸不会自己消失。 我们可以通过加入一个布尔标志位来模拟一下收银机是否处于卡纸状态。 如下,将 Register 收银机行动者改为:

class Register extends Actor with ActorLogging {
  import Register._
  import Barista._
  var revenue = 0
  val prices = Map[Article, Int](Espresso -> 150, Cappuccino -> 250)
  var paperJam = false
  override def postRestart(reason: Throwable) {
    super.postRestart(reason)
    log.info(s"Restarted, and revenue is $revenue cents")
  }
  def receive = {
    case Transaction(article) =>
      val price = prices(article)
      sender ! createReceipt(price)
      revenue += price
      log.info(s"Revenue incremented to $revenue cents")
  }
  def createReceipt(price: Int): Receipt = {
    import util.Random
    if (Random.nextBoolean()) paperJam = true
    if (paperJam) throw new PaperJamException("OMG, not again!")
    Receipt(price)
  }
}

同时移除之前加到咖啡师里的监护者策略。

现在,卡纸状况会永远保持,直到我们重启了收银机行动者。 但是我们也不能简单地重启他,因为这会导致营业额的重置。

这时候就需要引入错误核心模式 (error kernel pattern) 概念了。 他的含义是,当你的行动者内包含着重要的状态的时候,应把危险的任务交给子行动者去做,这样就能避免携带状态的行动者在崩溃时会导致的问题了。 有时候,为每个类似的任务创建一个新的子行动者是有道理的,但这不是必须的。 基本上,你应该总是效仿这种模式设计行动者系统的层级结构。

这种设计模式的基本元素是保证最重要的系统状态处于行动者架构越高层越好,并且将错误尽可能的压在架构的底层。

让我们为我们的收银机行动者实现这种模式。 我们依然让收银机保持营业额状态,但将容易出错的打印收据的行为放入一个新的子行动者 ReceiptPrinter 内。 ReceiptPrinter 的定义如下:

object ReceiptPrinter {
  case class PrintJob(amount: Int)
  class PaperJamException(msg: String) extends Exception(msg)
}
class ReceiptPrinter extends Actor with ActorLogging {
  var paperJam = false
  override def postRestart(reason: Throwable) {
    super.postRestart(reason)
    log.info(s"Restarted, paper jam == $paperJam")
  }
  def receive = {
    case PrintJob(amount) => sender ! createReceipt(amount)
  }
  def createReceipt(price: Int): Receipt = {
    if (Random.nextBoolean()) paperJam = true
    if (paperJam) throw new PaperJamException("OMG, not again!")
    Receipt(price)
  }
}

再一次的,我们通过一个布尔标志位来模拟卡纸异常,并在卡纸的状态下打印收据时抛出一个异常。 抽出了收银机的打印逻辑后,我们在这里定义了一个新的消息类型 PrintJob 并处理这条消息。

这是一种比较好的处理方式,不仅是因为把危险的操作从持有重要状态的收银机行动者中抽出来,并且他让我们的代码也变得更清晰和阐述: ReceiptPrinter 只负责打印收据,Register 也变得更清晰了 - 它只负责管理营业额,并把剩下的功能委托给子行动者:

class Register extends Actor with ActorLogging {
  import akka.pattern.ask
  import akka.pattern.pipe
  import context.dispatcher
  implicit val timeout = Timeout(4.seconds)
  var revenue = 0
  val prices = Map[Article, Int](Espresso -> 150, Cappuccino -> 250)
  val printer = context.actorOf(Props[ReceiptPrinter], "Printer")
  override def postRestart(reason: Throwable) {
    super.postRestart(reason)
    log.info(s"Restarted, and revenue is $revenue cents")
  }
  def receive = {
    case Transaction(article) =>
      val price = prices(article)
      val requester = sender
      (printer ? PrintJob(price)).map((requester, _)).pipeTo(self)
    case (requester: ActorRef, receipt: Receipt) =>
      revenue += receipt.amount
      log.info(s"revenue is $revenue cents")
      requester ! receipt
  }
}

我们不会在处理每个交易消息的时候都创建一个新的 ReceiptPrinter。 同时,我们利用默认的监护人策略,在错误出现时重启收据打印机。

上面的代码中,增加营业额的逻辑部分值得讨论一下: 首先,询问 (ask) 打印机以获取一张收据,然后映射询问函数的 future 的返回值,使其变为一个含有询问结果和请求者引用(也就是发送给收银机 Transaction 交易消息的行动者)的一个元组,随后将其传送给自己。 当这个元组消息被处理时,我们才增加销售额,最终将收据发送回给交易的请求者。

这样迂回的处理销售额的方式,是因为我们需要确保在收据成功打印后才增加销售额。 其遵循了一个重要原则:从来不要在 Future 中修改一个行动者的内部状态。 它使得我们能够确保营业额是在行动者的范围内增加的,而不是在其他的线程中被处理。

将发送者赋值给一个 val 定量是必要的,其原因类似: 映射一个未来对象时所在的上下文不是行动者的上下文。因为 sender 是一个方法,在映射函数内调用它时,返回值很可能会是发送什么别的消息的行动者,而不是我们想用的那个。

现在,我们的 Register 收银机可以安全自由的重启啦,✌️!

当然,将打印收据和管理营业额业务放在一起不是什么好主意。 我们为此是为了展示错误核心模式的一个用例。 不过,实际中将这两个业务分开处理要好的多,因为它们在概念上不是在一起的。

超时设定

我们另一个需要改进的地方是对于超时的处理。 目前,当 ReceiptPrinter 中出现异常时,由于使用了 ask,这会引发 AskTimeoutException 询问超时异常,返回给 Barista 咖啡师一个没有成功完成的 Future 对象。

由于 Barista 简单的映射了未来对象成功时的结果,然后把处理后的结果传给顾客,这样就使得顾客对象接收到一个含有 AskTimeoutException 异常的 Failure 对象。

但是,Customer 并没有询问过任何事物,自然他也没期待接收到这样的消息。事实上,他目前也无法处理这样的消息。 我们以友好的方式,在出现超时异常时,发送给顾客们一条 ComebackLater 稍后再来消息。 这是一条顾客可以明白的消息,它会使得顾客会在之后的某个时间再来购买咖啡。 这样优化了之前的实现,因为我们可以使得顾客知道是否能取得想要的咖啡。

为了实现它,我们可以调用含有 AskTimeoutException 超时异常的 Failure 对象的 recover 方法,将它映射成一个 ComebackLater 消息。 Barista 咖啡师的接受部分函数修改如下:

def receive = {
  case EspressoRequest =>
    val receipt = register ? Transaction(Espresso)
    receipt.map((EspressoCup(Filled), _)).recover {
      case _: AskTimeoutException => ComebackLater
    } pipeTo(sender)
  case ClosingTime => context.system.shutdown()
}

现在,顾客行动者知道如果咖啡没有购买成功时,他们可以重复尝试多次,直到成功购买到咖啡来满足咖啡瘾。

死亡监视

为保证系统的容错性,除子行动者之外,另一个重要原则是行动者需要监视着自己重要的依赖对象。

有时,你拥有一些需要依赖于其他的非自己子行动者的行动者。 这意味着我们不能使用监护人策略来处理依赖关系。 不过,我们依然需要有另一种方式监视其他行动者的状态,并在他们出问题时得到通知。

思考一下一个需要访问数据库的行动者。 你会有其他的行动者需要这个行动者处于可用的健康状态,并在它不可用时接收到通知。 也许你想在数据库访问行动者故障时时,将系统转入维护模式;或是简单的将死掉的行动者用某种备份行动者替换掉是更好地解决方案。

不管在哪种情况下,你都需要以某种方式监视行动者逝去的悲伤消息。 这可以通过调用定义在 ActorContext 上的 watch 方法实现。 为了展示,我们将 Customer 顾客改造一下,使其监视 Barista 咖啡师。 因为顾客们对咖啡因极为上瘾,因此让他们依赖于咖啡师是合情合理的:

class Customer(coffeeSource: ActorRef) extends Actor with ActorLogging {
  import context.dispatcher

  context.watch(coffeeSource)

  def receive = {
    case CaffeineWithdrawalWarning => coffeeSource ! EspressoRequest
    case (EspressoCup(Filled), Receipt(amount)) =>
      log.info(s"yay, caffeine for ${self}!")
    case ComebackLater =>
      log.info("grumble, grumble")
      context.system.scheduler.scheduleOnce(300.millis) {
        coffeeSource ! EspressoRequest
      }
    case Terminated(barista) =>
      log.info("Oh well, let's find another coffeehouse...")
  }
}

我们在顾客的构造函数里使其开始监视 coffeeSource,并加入对于 Terminated 消息的处理 - 这是种我们可以从Akka中所监视的对象死亡时可以接收到的消息。

现在,如果我们发送一个 ClosingTime 打烊时间消息给咖啡师,咖啡师将告诉行动者上下文结束自己的生命,这时顾客就会接受到咖啡师死亡的消息了。 请读者自己尝试一下,你就会在控制台中看到顾客关于”重新找一家咖啡店”的日志信息。

你也可以以此为基础,加入更好的失败处理逻辑。


总结

这是系列的第二篇关于行动者和Akka的文章。 你已经了解了一个行动者系统内几个重要的组成部分,还有如果利用Akka提供的工具和方法使得你的系统可以变得更有容错性。

不过还有还有很多你需要了解的Akka和行动者模型的知识,但我们把它留给你,因为这超出了此系列的范畴。 在下一部分,此系列将进入总结部分。我会为你提供一些Scala语言还有Akka工具包的学习资源。

Daniel Westheide · Mar 20th, 2013