[Finagle](https://github.com/twitter/finagle) 是 Twitter 研发的RPC系统。
[这篇博客](https://blog.twitter.com/2011/finagle-a-protocol-agnostic-rpc-system) 解释了其动机和核心设计原则, [finagle README](https://github.com/twitter/finagle/blob/master/README.md) 包含更详细的文档。Finagle的目标是方便地建立强大的客户端和服务器。
## Finagle-Friendly REPL
我们将要讨论的不是标准Scala的代码。如果你喜欢使用REPL学习,你可能想知道如何获得一个加入Finagle 及其依赖的 Scala REPL。
如果你在 `finagle`目录下有Finagle的源代码,你可以通过下面的命令得到一个控制台
$ cd finagle
$ ./sbt "project finagle-http" console
...build output...
## Futures
Finagle使用 `com.twitter.util.Future`[1](http://twitter.github.io/scala_school/zh_cn/finagle.html#fn1) 编码延迟操作。Future是尚未生成的值的一个句柄。Finagle使用Future作为其异步API的返回值。同步API会在返回前等待结果;但是异步API则不会等待。例如,个对互联网上一些服务的HTTP请求可能半秒都不会返回。你不希望你的程序阻塞等待半秒。 “慢”的API可以立即返回一个`Future`,然后在需要解析其值时“填充”。
val myFuture = MySlowService(request) // returns right away
...do other things...
val serviceResult = myFuture.get() // blocks until service "fills in" myFuture
val future = dispatch(req) // returns immediately, but future is "empty"
future onSuccess { reply => // when the future gets "filled", use its value
scala> import com.twitter.util.{Future,Promise}
import com.twitter.util.{Future, Promise}
scala> val f6 = Future.value(6) // create already-resolved future
f6: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@c63a8af
scala> f6.get()
res0: Int = 6
scala> val fex = Future.exception(new Exception) // create resolved sad future
fex: com.twitter.util.Future[Nothing] = com.twitter.util.ConstFuture@38ddab20
scala> fex.get()
... stack trace ...
scala> val pr7 = new Promise[Int] // create unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1994943491(...)
scala> pr7.get()
...console hangs, waiting for future to resolve...
Execution interrupted by signal.
scala> pr7.setValue(7)
scala> pr7.get()
res1: Int = 7
在实际代码中使用Future时,你通常不会调用`get` ,而是使用回调函数。 `get`仅仅是方便在REPL修修补补。
### 顺序组合(Sequential composition)
Future有类似[集合API中的组合子](http://twitter.github.io/scala_school/zh_cn/collections.html#combinators)(如 map, flatMap) 。回顾一下集合组合子,它让你可以表达如 “我有一个整数List和一个square函数:map那个列表获得整数平方的列表”这样的操作。这种表达方式很灵巧;你可以把组合子函数和另一个函数放在一起有效地组成一个新函数。面向Future的组合子可以让你这样表达:“我有一个期望整数的Future和一个square函数:map那个Future获得一个期望整数平方的Future”。
> `def Future[A].flatMap[B](f: A => Future[B]): Future[B]`
`flatMap` 序列化两个Future。即,它接受一个Future和一个异步函数,并返回另一个Future。方法签名中是这样写的:给定一个Future成功的值,函数`f`提供下一个`Future`。如果/当输入的`Future` 成功完成,`flatMap`自动调用`f`。只有当这两个Future都已完成,此操作所代表的`Future`才算完成。如果任何一个`Future`失败,则操作确定的 `Future`也将失败。这种隐交织的错误让我们只需要在必要时来处理错误,所以语法意义很大。`flatMap`是这些语义组合子的标准名称。
如果你有一个Future并且想在异步API使用其值,使用flatMap。例如,假设你有一个Future[User],需要一个Future[Boolean]表示用户是否已被禁止。有一个`isBanned` 的异步API来判断一个用户是否已被禁止。此时可以使用flatMap :
scala> import com.twitter.util.{Future,Promise}
import com.twitter.util.{Future, Promise}
scala> class User(n: String) { val name = n }
defined class User
scala> def isBanned(u: User) = { Future.value(false) }
isBanned: (u: User)com.twitter.util.Future[Boolean]
scala> val pru = new Promise[User]
pru: com.twitter.util.Promise[User] = Promise@897588993(...)
scala> val futBan = pru flatMap isBanned // apply isBanned to future
futBan: com.twitter.util.Future[Boolean] = Promise@1733189548(...)
scala> futBan.get()
...REPL hangs, futBan not resolved yet...
Execution interrupted by signal.
scala> pru.setValue(new User("prudence"))
scala> futBan.get()
res45: Boolean = false
scala> class RawCredentials(u: String, pw: String) {
| val username = u
| val password = pw
| }
defined class RawCredentials
scala> class Credentials(u: String, pw: String) {
| val username = u
| val password = pw
| }
defined class Credentials
scala> def normalize(raw: RawCredentials) = {
| new Credentials(raw.username.toLowerCase(), raw.password)
| }
normalize: (raw: RawCredentials)Credentials
scala> val praw = new Promise[RawCredentials]
praw: com.twitter.util.Promise[RawCredentials] = Promise@1341283926(...)
scala> val fcred = praw map normalize // apply normalize to future
fcred: com.twitter.util.Future[Credentials] = Promise@1309582018(...)
scala> fcred.get()
...REPL hangs, fcred doesn't have a value yet...
Execution interrupted by signal.
scala> praw.setValue(new RawCredentials("Florence", "nightingale"))
scala> fcred.get().username
res48: String = florence
scala> def authenticate(req: LoginRequest) = {
| // TODO: we should check the password
| Future.value(new User(req.username))
| }
authenticate: (req: LoginRequest)com.twitter.util.Future[User]
scala> val f = for {
| u <- authenticate(request)
| b <- isBanned(u)
| } yield (u, b)
f: com.twitter.util.Future[(User, Boolean)] = Promise@35785606(...)
它产生一个`f: Future[(User, Boolean)]`,包含用户对象和一个表示该用户是否已被禁止的布尔值。注意这里是怎样实现顺序组合的:`isBanned`使用了`authenticate`的输出作为其输入。
### 并发组合
`Future` 提供了一些并发组合子。一般来说,他们都是将`Future`的一个序列转换成包含一个序列的`Future`,只是方式略微不同。这是很好的,因为它(本质上)可以让你把几个Future封装成一个单一的Future。
object Future {
def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
def join(fs: Seq[Future[_]]): Future[Unit]
def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
scala> val f2 = Future.value(2)
f2: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@13ecdec0
scala> val f3 = Future.value(3)
f3: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@263bb672
scala> val f23 = Future.collect(Seq(f2, f3))
f23: com.twitter.util.Future[Seq[Int]] = Promise@635209178(...)
scala> val f5 = f23 map (_.sum)
f5: com.twitter.util.Future[Int] = Promise@1954478838(...)
scala> f5.get()
res9: Int = 5
scala> val ready = Future.join(Seq(f2, f3))
ready: com.twitter.util.Future[Unit] = Promise@699347471(...)
scala> ready.get() // doesn't ret value, but I know my futures are done
当传入的`Future`序列的第一个`Future`完成的时候,`select`会返回一个`Future`。它会将那个完成的`Future`和其它未完成的Future一起放在Seq中返回。 (它不会做任何事情来取消剩余的Future。你可以等待更多的回应,或者忽略他们)
scala> val pr7 = new Promise[Int] // unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1608532943(...)
scala> val sel = Future.select(Seq(f2, pr7)) // select from 2 futs, one resolved
sel: com.twitter.util.Future[...] = Promise@1003382737(...)
scala> val(complete, stragglers) = sel.get()
complete: com.twitter.util.Try[Int] = Return(2)
stragglers: Seq[...] = List(...)
scala> complete.get()
res110: Int = 2
scala> stragglers(0).get() // our list of not-yet-finished futures has one item
...get() hangs the REPL because this straggling future is not finished...
Execution interrupted by signal.
scala> pr7.setValue(7)
scala> stragglers(0).get()
res113: Int = 7
### 组合例子:缓存速率限制
// Find out if user is rate-limited. This can be slow; we have to ask
// the remote server that keeps track of who is rate-limited.
def isRateLimited(u: User): Future[Boolean] = {
// Notice how you can swap this implementation out now with something that might
// implement a different, more restrictive policy.
// Check the cache to find out if user is rate-limited. This cache
// implementation is just a Map, and can return a value right way. But we
// return a Future anyhow in case we need to use a slower implementation later.
def isLimitedByCache(u: User): Future[Boolean] = Future.value(limitCache(u))
// Update the cache
def setIsLimitedInCache(user: User, v: Boolean) { limitCache(user) = v }
// Get a timeline of tweets... unless the user is rate-limited (then throw
// an exception instead)
def getTimeline(cred: Credentials): Future[Timeline] =
isLimitedByCache(cred.user) flatMap {
case true => Future.exception(new Exception("rate limited"))
case false =>
// First we get auth'd user then we get timeline.
// Sequential composition of asynchronous APIs: use flatMap
val timeline = auth(cred) flatMap(getTimeline)
val limited = isRateLimited(cred.user) onSuccess(
setIsLimitedInCache(cred.user, _))
// 'join' concurrently combines differently-typed futures
// 'flatMap' sequentially combines, specifies what to do next
timeline join limited flatMap {
case (_, true) => Future.exception(new Exception("rate limited"))
case (timeline, _) => Future.value(timeline)
### 组合例子:网络爬虫
import com.twitter.util.{Try,Future,Promise}
// a fetchable thing
trait Resource {
def imageLinks(): Seq[String]
def links(): Seq[String]
// HTML pages can link to Imgs and to other HTML pages.
class HTMLPage(val i: Seq[String], val l: Seq[String]) extends Resource {
def imageLinks() = i
def links = l
// IMGs don't actually link to anything else
class Img() extends Resource {
def imageLinks() = Seq()
def links() = Seq()
// profile.html links to gallery.html and has an image link to portrait.jpg
val profile = new HTMLPage(Seq("portrait.jpg"), Seq("gallery.html"))
val portrait = new Img
// gallery.html links to profile.html and two images
val gallery = new HTMLPage(Seq("kitten.jpg", "puppy.jpg"), Seq("profile.html"))
val kitten = new Img
val puppy = new Img
val internet = Map(
"profile.html" -> profile,
"gallery.html" -> gallery,
"portrait.jpg" -> portrait,
"kitten.jpg" -> kitten,
"puppy.jpg" -> puppy
// fetch(url) attempts to fetch a resource from our fake internet.
// Its returned Future might contain a Resource or an exception
def fetch(url: String) = { new Promise(Try(internet(url))) }
def getThumbnail(url: String): Future[Resource]={
val returnVal = new Promise[Resource]
fetch(url) onSuccess { page => // callback for successful page fetch
fetch(page.imageLinks()(0)) onSuccess { p => // callback for successful img fetch
} onFailure { exc => // callback for failed img fetch
} onFailure { exc => // callback for failed page fetch
def getThumbnail(url: String): Future[Resource] =
fetch(url) flatMap { page => fetch(page.imageLinks()(0)) }
def getThumbnails(url:String): Future[Seq[Resource]] =
fetch(url) flatMap { page =>
page.imageLinks map { u => fetch(u) }
如果这对你有意义,那太好了。你可能会看不懂这行代码 `page.imageLinks map { u => fetch(u) }`:它使用`map`和`map`后的函数返回一个Future。当接下来的事情是返回一个Future时,我们不是应该使用flatMap吗?但是请注意,在`map`*前*的不是一个Future;它是一个集合。collection map function返回一个集合;我们使用Future.collect收集Future的集合到一个Future中。
**并发 + 递归**
// Return
def crawl(url: String): Future[Seq[Resource]] =
fetch(url) flatMap { page =>
page.links map { u => crawl(u) }
) map { pps => pps.flatten }
...hangs REPL, infinite loop...
Execution interrupted by signal.
// She's gone rogue, captain! Have to take her out!
// Calling Thread.stop on runaway Thread[Thread-93,5,main] with offending code:
// scala> crawl("profile.html")
## 服务
一个Finagle `服务`用来处理RPC,读取请求并给予回复的。服务是针对请求和回应的一个函数`Req => Future[Rep]`。
> `abstract class Service[-Req, +Rep] extends (Req => Future[Rep])`
![Client and Server](https://box.kancloud.cn/2015-09-07_55ed22af19c27.png "Client and Server")
* 一个*使用*服务的函数:分发一个 `Req`并处理 `Future[Rep]`
* 配置怎样分发这些请求;例如,作为HTTP请求发送到`api.twitter.com`的80端口
* 一个*实现*服务的函数:传入一个`Req` 并返回一个`Future[Rep]`
* 配置如何“监听”输入的 Reqs;例如,在80端口的HTTP请求。
![Filter and Server](https://box.kancloud.cn/2015-09-07_55ed22afee2ac.png "Filter and Server")
## 客户端
import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpRequest, HttpResponse, HttpVersion, HttpMethod}
import com.twitter.finagle.Service
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
// Don't worry, we discuss this magic "ClientBuilder" later
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.hosts("twitter.com:80") // If >1 host, client does simple load-balancing
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")
val f = client(req) // Client, send the request
// Handle the response:
f onSuccess { res =>
println("got response", res)
} onFailure { exc =>
println("failed :-(", exc)
## 服务端
import com.twitter.finagle.Service
import com.twitter.finagle.http.Http
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion, HttpResponseStatus, HttpRequest, HttpResponse}
import java.net.{SocketAddress, InetSocketAddress}
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.builder.ServerBuilder
// Define our service: OK response for root, 404 for other paths
val rootService = new Service[HttpRequest, HttpResponse] {
def apply(request: HttpRequest) = {
val r = request.getUri match {
case "/" => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
case _ => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)
// Serve our service on a port
val address: SocketAddress = new InetSocketAddress(10000)
val server: Server = ServerBuilder()
## 过滤器
class MyService(client: Service[..]) extends Service[HttpRequest, HttpResponse]
def apply(request: HttpRequest) = {
client(rewriteReq(request)) map { res =>
其中`rewriteReq` 和 `rewriteRes`可以提供协议翻译,例如。
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])
((ReqIn, Service[ReqOut, RepIn])
=> Future[RepOut])
(* Service *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]
class TimeoutFilter[Req, Rep](
timeout: Duration,
exception: RequestTimeoutException,
timer: Timer)
extends Filter[Req, Rep, Req, Rep]
def this(timeout: Duration, timer: Timer) =
this(timeout, new IndividualRequestTimeoutException(timeout), timer)
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val res = service(request)
res.within(timer, timeout) rescue {
case _: java.util.concurrent.TimeoutException =>
这个例子展示了怎样(通过认证服务)提供身份验证来将 `Service[AuthHttpReq, HttpRep]` 转换为 `Service[HttpReq, HttpRep]`。
class RequireAuthentication(authService: AuthService)
extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] {
def apply(
req: HttpReq,
service: Service[AuthHttpReq, HttpRep]
) = {
authService.auth(req) flatMap {
case AuthResult(AuthResultCode.OK, Some(passport), _) =>
service(AuthHttpReq(req, passport))
case ar: AuthResult =>
new RequestUnauthenticated(ar.resultCode))
过滤器可以使用 `andThen` 组合在一起。传入一个`Service`参数给`andThen` 将创建一个(添加了过滤功能)的`Service`(类型用来做说明)。
val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]
val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
authFilter andThen timeoutFilter
val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
authenticateAndTimedOut andThen serviceRequiringAuth
## 生成器(Builder)
生成器把所有组件组合在一起。一个`ClientBuilder`对给定的一组参数生成一个`Service`,而一个 `ServerBuilder` 获取一个 `Service` 的实例,并调度传入请求给它。为了确定`Service`的类型,我们必须提供一个`编解码器(Codec)`。编解码器提供底层协议的实现(如HTTP,thrift,memcached)。这两个Builder都有很多参数,其中一些是必填的。
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.reportTo(new OstrichStatsReceiver)
这将构建一个客户端在三个主机上进行负载平衡,最多在每台主机建立一个连接,并在两次失败尝试后放弃。统计数据会报给 [ostrich](https://github.com/twitter/ostrich) 。以下生成器选项是必须的(而且它们也被静态强制填写了):`hosts` 或 `cluster`, `codec` 和 `hostConnectionLimit`。
val service = new MyService(...) // construct instance of your Finagle service
var filter = new MyFilter(...) // and maybe some filters
var filteredServce = filter andThen service
val server = ServerBuilder()
.bindTo(new InetSocketAddress(port))
.name("my filtered service")
// .hostConnectionMaxLifeTime(5.minutes)
// .readTimeout(2.minutes)
通过这些参数会生成一个Thrift服务器监听端口port,并将请求分发给service。如果我们去掉`hostConnectionMaxLifeTime`的注释,每个连接将被允许留存长达5分钟。如果我们去掉`readTimeout`的注释,那么我们就需要在2分钟之内发送请求。`ServerBuilder`必选项有:`name`, `bindTo` 和 `codec`。
## 不要阻塞(除非你用正确的方式)
Finagle 自动操纵线程来保证服务顺利运行。但是,如果你的服务阻塞了,它会阻塞所有Finagle线程。
* 如果你的代码调用了一个阻塞操作(`apply` 或 `get`),使用[Future 池](https://github.com/twitter/finagle#Using%20Future%20Pools)来包装阻塞代码。阻塞操作将运行在自己的线程池中,返回一个Future来完成(或失败)这个操作,并可以和其它Future组合。
* 如果你的代码中使用Future的顺序组合,不用担心它会“阻塞”组合中的Future。
[1](http://twitter.github.io/scala_school/zh_cn/finagle.html#fnr1) 小心,还有其它“Future”类。不要将`com.twitter.util.Future` 和`scala.actor.Future` 或 `java.util.concurrent.Future`混淆起来!
[2](http://twitter.github.io/scala_school/zh_cn/finagle.html#fnr2) 如果你学习类型系统和/或分类理论,你会高兴地发现`flatMap`相当于一元绑定。
