Riak是一个分布式、容错和开放源代码的数据库,它展示了如何使用Erlang/OTP来构建大型可伸缩系统。Riak提供了一些其他数据库中并不常见的特性,比如高可用性、容量和吞吐量的线性伸缩能力等,很大程度上,这是借由Erlang对大规模可伸缩分布式系统的支持实现的。
要开发像Riak这样的系统,Erlang/OTP是一个理想的平台,因为它提供了可以直接利用的节点间通信、消息队列、故障探测和客户-服务器抽象等功能。而且,Erlang中大多数常见的模式都已经以库模块的形式实现了,我们一般称之为OTP behaviors。其中包括了用于并发和错误处理的通用代码框架,可以简化并发编程,也能避免开发者陷入一些常见的陷阱。Behaviors由管理者负责监管,而管理者本身也是behavior,这样就组成了一个监管树。通过将监管树打包到应用程序中,这就创建了一个Erlang程序的构建块。
一个完整的Erlang系统,如Riak,是由一组松散耦合且相互作用的应用组成的。其中有些应用是开发者编写的,有些是标准Erlang/OTP发布包中的,还有一些可能是其他的开源组件。这些应用由一个boot脚本按顺序加载并启动,而该脚本是从应用清单和版本信息中生成的。
系统之间的区别在于,启动的发布版本中的应用有所不同。在标准的Erlang发行版中,boot文件会启动Kernel和StdLib(Standard Library,标准库)等应用。而在有些安装版本中,还会启动SASL(Systems Architecture Support Library,系统架构支持库)应用。SASL中包含了带有日志功能的发布和软件更新工具。对Riak而言,除了启动其特定的应用以及运行时依赖(其中包括Kernel、StdLib和SASL)之外,并没有什么不同。一个完整的、准备好运行的Riak构建版本,实际上将Erlang/OTP发行包中的这些标准元素都嵌入其中了,当在命令行调用`riak start` 时,它们会一同启动。Riak由很多复杂的应用组成,所以本章不应看做一个完整的指南。倒是可以把本章看做以Riak源代码为例,针对OTP的入门指南。图片和数字主要是为了阐明设计意图,故有所简化。
## 15.1 Erlang简介
Erlang是一个并发的函数式编程语言,用它编写的程序会编译为字节代码并运行在虚拟机上。程序中互相调用的函数经常会产生副作用,如进程间消息传递,I/O和数据库操作等。而Erlang变量是单赋值的,也就是说,一旦变量被给定了一个值,就再也不能修改了。从下面的计算阶乘的例子可以看出,Erlang中大量使用了模式匹配:
~~~
-module(factorial).
-export([fac/1]).
fac(0) -> 1;
fac(N) when N>0 ->
Prev = fac(N-1),
N*Prev.
~~~
在这段代码中,第一个子句(clause)给出了0的阶乘,第二个字句计算正数的阶乘。每一个子句的主体部分都是一个表达式序列,主体部分中最后一个表达式就是这个子句的计算结果。调用这个函数的时候如果传入一个负数会导致运行时错误,因为没有一个子句能匹配负数的模式。不处理这种情况的做法是非防御式(non-defensive)编程的一个例子,这种做法也是Erlang中鼓励的做法。
在模块之中,函数以正常的方式调用;而在模块之外,函数名之前应该加上模块名,如`factorial:fac(3)`。允许定义同名但是参数数目不同的函数——函数的参数数目称为函数的元数(arity)。在`factorial`模块的export指令中,元数为1的`fac`函数通过`fac/1`表示。
Erlang支持元组(tuple,也称为乘积类型(product type))和列表(list)。元组由花括号包围起来,例如`{ok,37}`。在元组中,通过元素的位置访问元素。记录(record)是另一种数据类型;在记录中可以保存固定数目的元素,这些元素可以通过名字访问和操作。例如这样的语法可以定义一个记录:`-record(state, {id, msg_list=[]})`。通过表达式`Var = #state{id=1}`可以创建一个实例,然后通过这样的表达式可以查看实例中的内容:`Var#state.id`。如果要使用可变数目的元素,那么我们可以使用列表,列表通过方括号定义,例如`[23,34]`。`[X|Xs]`的表达方式匹配一个非空的列表,其中X匹配头,Xs匹配尾。用小写字母开头的标识符表示一个原子(atom),原子就是一个表示自己的字符串;例如,元组`{ok,37}`中的`ok`就是一个原子。通常通过这种方式使用原子来表示函数的结果,例如除了`ok`结果之外,还可以有`{error, "Error String"}`这种形式的结果。
Erlang系统中的进程在独立的内存中并发运行,以消息传递的方式进行相互通信。进程可以应用于大量的应用,其中包括数据库的网关,协议栈的处理程序,以及管理从其他进程发送来的跟踪消息的日志。虽然这些进程处理不同的请求,但是进程处理请求的方式却是有相似之处的。
因为进程只存在于虚拟机中,一个VM可以同时运行成千上万个进程,Riak就大量使用了这一特性。例如,对数据的每一个请求——读、写和删除——都采用独立进程处理的模型,这种方式对于大多数采用操作系统级线程的实现而言都是不可能的。
进程是通过进程标识符识别的,进程标识符称为PID;此外,进程还可以通过别名注册,不过注册别名的方式应该只用于长时间运行的“静态”进程。如果一个进程注册了一个别名,那么其他进程就可以在不知道这个进程PID的情况下给这个进程发送消息。进程的创建通过内建函数(built-in function,BIF) `spawn(Module, Function, Arguments)`完成。BIF是集成在虚拟机中的函数,用于完成纯Erlang不可能实现或实现很慢的功能。`spawn/3`这个BIF接受一个`Module`、一个`Function`和一个`Arguments`作为参数。这个BIF的调用返回新创建的进程的PID,并且产生一个副作用,就是创建了一个新的进程以之前传入的参数执行模块中的函数。
我们通过`Pid ! Msg`这种写法将消息`Msg`发送给进程`Pid`。一个进程可以通过调用BIF `self`来得到其PID,之后该进程可以将PID发送给其他进程,这样别的进程就能够利用它与原来的进程通信了。假设一个进程期望接收`{ok, N}`和`{error, Reason}`这种形式的消息。这个进程可以通过receive语句处理这些消息:
~~~
receive
{ok, N} ->
N+1;
{error, _} ->
0
end
~~~
这条语句的结果是由模式匹配语句确定的数值。如果在模式匹配中并不需要某个变量的值,可以像上面例子中那样用下划线来代替。
进程之间的消息传递是异步的,进程接收到的消息会按照其到达顺序放在其信箱中。假设现在正在执行的就是上面的receive表达式:如果信箱中的第一个元素是`{ok, N}`或`{error, Reason}`,那就可以返回相应结果。如果第一个元素并非这两种形式之一,那它会继续保留在信箱之中,然后以类似的方式处理第二个消息。如果没有消息能匹配成功,receive会继续等待,直到接收到一个匹配的消息。
进程终止有两种原因。如果没有更多的代码要执行了,它们会以原因*normal*退出。如果进程遇到了运行时错误,它会以非*normal*的原因退出。进程的终止只会对和其“链接”在一起的进程产生影响。进程可以通过BIF `link(Pid)`链接在一起,也可以在调用`spawn_link(Module, Function, Arguments)`的时候链接在一起。如果一个进程终止了,那么这个进程会对其链接集合中的所有进程发送一个EXIT信号。如果终止原因不是normal,那么收到这个信号的进程会终止自己,并且进一步传播EXIT信号。如果调用BIF `process_flag(trap_exit, true)`,那么进程收到EXIT信号之后不会终止,而是以Erlang消息的方式将EXIT信号放在进程的信箱中。
Riak通过EXIT信号监视辅助进程的健康状况,这些辅助进程负责执行由请求驱动的有限状态机发起的非关键性的工作。当这些辅助进程异常终止的时候,父进程可以通过EXIT信号决定忽略错误或重新启动进程。
## 15.2\. 进程框架
我们前面引入了这一概念,即不管进程是出于什么目的创建的,它们总要遵从一个共同的模式。作为开始,我们必须创建一个进程,然后可以为它注册一个别名,当然后者是可选的。对于新创建的进程而言,它的第一个动作是初始化进程循环数据。循环数据一般通过在进程初始化时传给内置函数`spawn`的参数得到。循环数据保存在叫做进程状态的变量中。状态(一般保存在一个记录中)会被传递给接收-求值函数,该函数是一个循环,负责接收消息,处理消息,更新状态,之后将状态作为参数传给一个尾递归调用。如果处理到了‘stop’消息,接收进程会清理自身数据,然后退出。
不管进程要执行什么任务,这都是进程之间反复出现的一种机制。记住这一点之后,我们再来看一下,遵守这一模式的进程之间又有何不同:
* 创建不同的进程时传入BIF `spawn`的参数会有不同
* 在创建一个进程的时候,要考虑是否为这个进程注册一个别名,如果需要的话,还要考虑别名是什么。
* 初始化进程状态的函数要执行的动作依进程执行任务的不同而不同。
* 无论哪种情况,系统的状态都用循环数据表示,但不同进程的循环数据会有不同。
* 在接收-求值循环体中,不同的进程接收的消息是不一样的,而且处理的方式也五花八门。
* 最后,在进程结束的时候,清理动作也随进程而异。
所以,即使存在一个通用的动作框架,它们仍然需要与具体任务相关的各种动作来补充。以该框架为模板,程序员能够创建不同的进程,用以承担服务器、有限状态机、事件处理程序和监督者等不同职责。但是我们不必每次都重新实现这些模式,它们已经作为行为模式放在类库中了。它们是OTP中间件的一部分。
## 15.3\. OTP行为
开发Riak的核心开发者团队分布在十几个不同的地点。如果没有非常紧密的合作和可操作的模板,那么最终可能会得到各种不同的客户端/服务器实现,这些实现可能还不能处理特殊的边界条件和并发相关的错误。此外,可能还无法形成一种处理客户端和服务器崩溃的统一方法,而且也无法保证来自于一个请求的应答是一个合法应答而不只是某条服从内部消息协议的任意消息。
OTP指的是一组Erlang库和设计模式,宗旨是为开发健壮系统提供一组现成的工具。其中很多模式和库都以“行为”(behavior)的形式提供。
OTP行为提供了一些实现了最常见并发设计模式的库模块,从而解决了上述问题。在幕后,这些库模块可以确保以一致的方式处理错误和特殊情况,而程序员并不需要意识到这些。因此,OTP行为提供了一组标准化的构建单元,利用这些构建单元可以设计和构建工业强度的系统。
### 15.3.1\. OTP行为简介
OTP行为是通过`stdlib`应用程序中的一些库模块提供的,而后者是Erlang/OTP发行版中的一部分。由程序员编写的具体代码放在独立的模块中,这些代码通过每一个行为中预定义的一组标准回调函数调用。这个回调模块要包含实现某个功能所需要的所有具体代码。
OTP行为中包含工作进程,负责实际的处理工作,还包含监督者进程,负责监视工作进程和其他监督进程。工作进程(worker)行为包括服务器、事件处理程序和有限状态机,在图中通常使用圆圈表示。监督者(supervisor)负责监视其子进程,既包含工作进程也包含其他监督者,在图中通常用方框表示,工作者和监督者共同组成了监督树(supervision tree)。
![OTP Riak监督树](http://box.kancloud.cn/2015-08-20_55d5879fb1af0.jpg)图15.1: OTP Riak监督树
监督树包装在一个名为应用程序(application)的行为中。OTP应用程序不仅是Erlang系统中构建单元,还是一种包装可重用组件的方法。像Riak这样的工业级别的系统由一组低耦合且可能分布式的应用程序组成。在这些应用程序中,有一些属于标准Erlang发行版的一部分,另一些则是为了实现Riak中特定功能所编写的。
OTP应用程序的例子还包括Corba ORB和简单网络管理协议(Simple Network Management Protocol,SNMP)代理。OTP应用程序是一个可重用的组件,通过监督进程和工作进程的方式将库模块包装在一起。从现在开始,我们提到应用程序的时候指的就是OTP应用程序。
行为模块包含每一种行为类型所需的所有通用代码。尽管你也可以实现自己的行为模块,但是一般情况下不需要这样做,因为Erlang/OTP发行版中自带的行为可以满足你的代码中会使用到的大部分设计模式。行为模块提供的通用功能包含了以下操作:
* 创建进程,还支持注册进程;
* 通过同步或异步的方式发送和接收客户消息,包括内部消息协议的定义;
* 保存循环数据和管理进程循环;
* 终止进程。
循环数据是一个变量,行为需要在多个函数调用之间保存的数据都存放在这个变量中。函数调用之后返回修改后的循环数据。这个修改后的循环数据通常称为新循环数据,这个数据作为参数被传入下一个调用中。循环数据常被称为行为状态。
通用服务器应用程序使用的回调模块中包含的功能负责提供所需求的具体行为,这些功能包括:
* 初始化进程循环数据;如果需要注册进程,还要初始化进程名称。
* 处理具体的客户请求;如果请求是同步的,还要将应答发送回客户。
* 在请求处理之间维护和更新进程循环数据。
* 在进程终止的时候清理进程循环数据。
### 15.3.2\. 通用服务器
`gen_server`行为定义了实现了客户端/服务器行为的通用服务器(generic server),`gen_server`行为是标准库应用程序自带的行为。我们下面以`riak_core`应用程序中的`riak_core_node_watcher.erl`为例讲解通用服务器。这个服务器负责跟踪并报告Riak集群中有哪些子服务和节点是可用的。这个模块头部的指令如下所示:
~~~
-module(riak_core_node_watcher).
-behavior(gen_server).
%% 这个模块提供的API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
services/1,nodes/1,avsn/0]).
%% gen_server的回调函数
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).
-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
bcast_mod={gen_server, abcast}}).
~~~
通过`-behavior(gen_server)`指令可以很快判断出这个模块是一个通用服务器。这条指令的作用是让编译器确保所有的回调函数都正确导出了。服务器循环数据会使用到记录`state`。
### 15.3.3\. 启动服务器
使用`gen_server`行为的时候,不要使用BIF `spawn`或`spawn_link`,而是要使用`gen_server:start`或`gen_server:start_link`函数。`spawn`和`start`的主要区别在于后者本质上是同步的。由于`start`调用直到工作进程完成了初始化之后才会返回,所以使用`start`代替`spawn`可以使得工作进程启动过程的确定性更好,并能避免意外的竞争条件。这个函数有两种调用方式:
~~~
gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)
~~~
`ServerName`是一个元组,格式为`{local, Name}`或`{global, Name}`,表示如果进程需要注册,则代表进程别名的本地`Name`或全局`Name`。如果使用了全局名称,那么则可以在分布式Erlang节点组成的集群中透明地访问服务器。如果不想注册进程,而是通过进程PID引用进程,那么请忽略这个参数,使用`start_link/3`或`start/3`函数调用。`CallbackModule`是保存了具体回调函数的模块名称;`Arguments`是一个合法的Erlang term,表示传递给`init/1`回调函数的参数;`Options`是一个列表,可以用来设置内存管理标志`fullsweep_after`和`heapsize`以及其他跟踪和调试标志。
在本文的例子中,调用了`start_link/4`,通过回调模块的名字注册进程,回调模块的名字通过`?MODULE`宏调用得到。编译代码的时候,这个宏被预处理器展开为这个宏所在的模块的名称。将行为的别名设置为实现行为的回调模块名称总是一个好的做法。由于不需要传入任何参数,所以保留空参数列表。选项列表也留空:
~~~
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
~~~
`start_link`和`start`函数的明显区别在于前者将进程链接至其父进程,而后者不会链接,这个父进程通常是一个监督者进程。这里需要特别说明的是,将自己链接至监督者进程是OTP行为的责任所在。`start`函数通常用于在shell中测试行为,因为导致shell进程崩溃的输入错误不会对行为产生影响。`start`和`start_link`函数的所有变体都返回`{ok, Pid}`。
`start`和`start_link`函数会创建一个新的进程调用`CallbackModule`中的回调函数`init(Arguments)`,并且传入`Arguments`。`init`函数必须初始化服务器的`LoopData`并且返回一个格式为`{ok, LoopData}`的元组。`LoopData`包含了将要传递给回调函数的第一个循环数据实例。如果需要保存传入`init`函数的一些参数,那么应该保存在`LoopData`变量中。在Riak节点监视器服务器中的`LoopData`保存的是调用`schedule_broadcast/1`并传入一个类型为`state`的记录得到的结果,这个记录实例中所有字段的值都设置为默认值:
~~~
init([]) ->
%% 监视节点的启动和停止事件
net_kernel:monitor_nodes(true),
%% 设置跟踪节点状态的ETS表
ets:new(?MODULE, [protected, named_table]),
{ok, schedule_broadcast(#state{})}.
~~~
尽管监督者进程可能会调用`start_link/4`函数,但是调用`init/1`回调函数的是另外一个进程:即刚刚创建的那个进程。由于这个服务器的作用是发现、记录和广播Riak中所有子服务的可用性,因此这个初始化过程要求Erlang运行时通知这个进程这种事件,并且设置一个保存这些信息的表。这些设置必须在初始化过程中完成,因为如果这个数据结构还不存在的话,任何对服务器的调用都会失败。在`init`函数中只完成必要的设置工作,并且尽量减少`init`中完成的工作,因为`init`的调用是一个同步调用,只有这个调用返回之后才能继续进行其他串行的过程。
### 15.3.4\. 传递消息
如果需要向服务器发送一条同步的消息,可以使用`gen_server:call/2`函数。如果需要进行异步调用,则使用`gen_server:cast/2`函数。下面首先看一下Riak服务API中的两个函数,之后再给出剩下的代码。这两个函数由客户端进程调用,调用的结果是向服务器进程发送一个同步消息,其中服务器的注册名称和回调模块的名称一致。注意,发送给服务器数据的验证应该在客户端这边进行。如果客户端发送了不正确的信息,服务器应该终止。
~~~
service_up(Id, Pid) ->
gen_server:call(?MODULE, {service_up, Id, Pid}).
service_down(Id) ->
gen_server:call(?MODULE, {service_down, Id}).
~~~
收到消息之后,`gen_server`进程调用`handle_call/3`回调函数处理收到的消息,处理的顺序和消息发送的顺序一致。
~~~
handle_call({service_up, Id, Pid}, _From, State) ->
%% 在本地更新活动服务的集合
Services = ordsets:add_element(Id, State#state.services),
S2 = State#state { services = Services },
%% 移除所有和这个服务相关的mref
delete_service_mref(Id),
%% 为表示这个服务的PID设置一个监视器
Mref = erlang:monitor(process, Pid),
erlang:put(Mref, Id),
erlang:put(Id, Mref),
%% 更新本地ETS表并广播
S3 = local_update(S2),
{reply, ok, update_avsn(S3)};
handle_call({service_down, Id}, _From, State) ->
%% 在本地更新活动服务的集合
Services = ordsets:del_element(Id, State#state.services),
S2 = State#state { services = Services },
%% 移除所有和这个服务相关的mref
delete_service_mref(Id),
%% 更新本地ETS表并广播
S3 = local_update(S2),
{reply, ok, update_avsn(S3)};
~~~
注意这个回调函数的返回值。返回值是一个元组,这个元组包含了控制用的原子`reply`,告诉`gen_server`的通用代码这个元组的第二个元素(在上面的两个函数中都是ok)就是要发送回客户端的应答。这个元组中第三个元素是新的`State`,也就是说在服务器的下一次循环迭代中要将这第三个元素传递给`handle_call/3`函数。在上面的两个函数中,第三个元素都更新为表示新的可用服务集合。参数`_From`是一个元组,其中包含一个唯一的消息引用和客户端进程标识符。这个元组会在其他库函数中使用到,但是本章不会讨论这些库函数。在大多数情况下,都不需要使用这个元组。
`gen_server`库在这个操作的幕后内建了很多机制和保护措施。如果客户端向服务器发送一条同步消息,而且5秒钟内没有收到应答,那么执行`call/2`函数的进程就会被终止。通过调用`gen_server:call(Name, Message, Timeout)`函数可以修改这个行为,其中`Timeout`既可以是毫秒值,也可以是原子`infinity`(表示不限时)。
最早设计超时机制的目的是为了防止发生死锁,可以保证不小心互相调用的服务器在超过默认超时时间后会被终止。为了最终能调试并修复错误,崩溃的报告会被记录下来。大部分应用程序在5秒钟的超时时间内都能正常地工作,但是在非常重的负载下,可能需要调整这个值,甚至有可能使用`infinity`;这些决策都和具体的应用程序相关。在Erlang/OTP中所有的关键代码都是用了`infinity`。在Riak中不同的地方使用了不同的超时时间:内部的耦合组件之间通常使用`infinity`,而在某些情况下当和Riak通信的客户端指定某个操作应该允许超时的时候,`Timeout`由用户传入的参数设置。
使用`gen_server:call/2`函数时其他的防护机制还包括处理给一个不存在的服务器发送消息的情况以及服务器在发送应答之前就崩溃的情况。在这两种情况下,调用的进程都会终止。在使用纯Erlang的时候,发送一个永远不会在receive子句中匹配模式的消息会导致内存泄露的bug。在Riak中采用了两种不同的策略缓和这种内存泄露,这两种策略都使用了“捕捉所有消息”的匹配子句。在消息有可能是用户产生的情况下,不匹配的消息可能会被忽略掉。在消息只可能来自于Riak内部组件的情况下,这种情况表示出现了一个bug,因此会触发一个错误报警的内部崩溃报告,并且重新启动接受这个消息的工作进程。
发送异步消息的工作方式类似。消息被异步地发送给通用服务器,并且在`handle_cast/2`回调函数中处理。这个函数必须返回一个格式为`{reply, NewState}`的元组。使用异步调用的场合包括:不关心服务器请求的时候,以及不用担心产生超出服务器可以承担的消息量时。当我们对应答本身不感兴趣,但是需要等待这条消息被处理完才能发送下一个请求的时候,应该使用`gen_server:call/2`,并且在应答中返回原子`ok`。考虑一个场景,一个进程生成数据库条目的速度超出了Riak可以消耗的速度。使用异步调用的时候,可能的风险是填满进程的信箱,使得节点耗尽内存。Riak通过`gen_server`同步调用的消息串行化特性控制负载,只有当前一个请求被处理完时才处理下一个请求。这种方法可以避免使用更为复杂的节流代码的必要性:除了能提供并发,`gen_server`进程还可以用来产生串行化点。
### 15.3.5\. 停止服务器
如何停止服务器?在`handle_call/3`和`handle_cast/2`回调函数中,不要返回`{reply, Reply, NewState}`和`{noreply, NewState}`,而是分别返回`{stop, Reason, Reply, NewState}`和`{stop, Reason, NewState}`。需要有某种机制触发这个返回值,这种机制通常是发送给服务器的`stop`消息。接收到包含`Reason`和`State`的`stop`元组之后,通用代码执行`terminate(Reason, State)`回调函数。
`terminate`函数非常适合执行清理服务器`State`的代码和系统使用的所有其他持久化数据的代码。在我们的例子中,向其他对等进程发送最后一条消息,让它们知道这个节点监视器不再继续工作和监视了。在这个例子中,变量`State`包含一个带有`status`字段和`peers`字段的记录:
~~~
terminate(_Reason, State) ->
%% 通知所有对等进程这个进程正在关闭
broadcast(State#state.peers, State#state { status = down }).
~~~
将行为的回调函数当做库函数并且在程序中其他部分调用是一个非常糟糕的做法。例如,绝对不要在另一个模块中调用riak_core_node_watcher:init(Args)获得初始的循环数据。要获得这种数据,应该通过同步调用服务器的方式获得。行为回调函数只能被行为库模块调用,而且是因为系统中发生的事件而触发的调用,绝对不能直接被用户调用。
## 15.4\. 其他工作进程行为
利用同样的思想,还可以实现很多其他类型的工作进程行为,而且在OTP中已经实现了不少行为。
### 15.4.1\. 有限状态机
有限状态机(Finite state machines,FSM)实现在`gen_fsm`行为模块中,是电信系统(Erlang最初设计时面向的问题领域)中实现协议栈会使用到的一个关键组件。状态以回调函数的形式定义,回调函数根据状态的名称命名,回调函数要返回一个包含下一个状态和更新后的循环数据的元组。可以向这些状态发送同步和异步事件。有限状态机的回调模块还要导出标准的回调函数,例如`init`、`terminate`和`handle_info`。
当然,有限状态机并不仅限于使用电信系统。在Riak中,请求处理程序中使用了有限状态机。当客户端发出一个请求(例如`get`、`put`和`delete`)时,监听这个请求的进程会创建出一个实现了对应`gen_fsm`行为的进程。例如,`riak_kv_get_fsm`负责处理`get`请求:检索数据并将数据发送给客户端进程。当这个FSM进程判断对哪些节点请求数据时,向这些节点发送消息时以及从这些节点接收到数据、错误或超时应答时,都会经历一些不同的状态。
### 15.4.2\. 事件处理程序
事件处理程序和事件管理器是`gen_event`库模块中实现的另一个行为。基本思想是创建一个接收某一种特定类型事件的集中点。事件可以同步发送也可以异步发送,事件带有一组接收到事件需要采取的预定义动作。接收到事件之后可能发生的响应包括将事件记录到文件中、以短消息的形式发送报警、或者采集统计信息等。每一个这一类动作都定义在一个独立的回调模块中,并且在每一次调用中都有自己的循环数据。每一个特定的事件管理器都可以添加、删除或更新处理程序。因此在实际应用中,每一个事件管理器都有可能有很多回调模块,而这些回调模块的不同实例也可以出现在不同的管理器中。事件处理程序包括接收报警的进程、接收动态跟踪数据的进程、接收设备相关事件的进程或接收简单日志的进程。
Riak中使用`gen_event`行为的一个例子是管理“环事件”的订阅,即一个Riak集群的从属关系或分区分配的变化。Riak节点上的进程可以在实现了`gen_event`行为的`riak_core_ring_events`实例中注册一个函数。每当管理这个节点环的中央进程修改了整个集群的从属关系记录时,这个进程就会产生一个事件,结果就是使得这些回调模块都会调用注册的函数。通过采用这种方式,Riak中的各个部分很容易对Riak的某个最为中心的数据结构发生的变化进行响应,而不会增加这个数据结构中央管理的复杂性。
我们刚才讨论的3个主要行为——`gen_server`、`gen_fsm`和`gen_event`——能够处理最为常见的并发和通信模式。然而,在大型系统中,一些和应用具体相关的模式会随着时间推移而出现,因而有必要能够创建新的行为。Riak就包含了这样一种行为:`riak_core_vnode`,这种行为形式化描述了虚拟节点的实现方式。虚拟节点是Riak中最重要的存储抽象,对请求驱动的有限状态机暴露了一个统一的键值存储接口。回调模块的接口通过`behavior_info/1`函数具体说明,如下所示:
~~~
behavior_info(callbacks) ->
[{init,1},
{handle_command,3},
{handoff_starting,2},
{handoff_cancelled,1},
{handoff_finished,2},
{handle_handoff_command,3},
{handle_handoff_data,2},
{encode_handoff_item,2},
{is_empty,1},
{terminate,2},
{delete,1}];
~~~
以上示例展示了`riak_core_vnode`中`behavior_info/1`函数的代码。元组`{CallbackFunction, Arity}`的列表定义了回调模块必须遵守的协议。具体的虚拟节点实现必须导出这些函数,否则编译器会发出警告。自定义OTP行为的实现比较简单直接。除了定义回调函数之外,还需要通过`proc_lib`模块中的特定函数启动自定义的行为,以及通过`sys`模块处理系统消息并监视父进程以防父进程终止。
## 15.5\. 监督者行为
监督者行为的任务就是监视其子进程,并且根据预先配置好的规则在子进程终止的时候执行相应的动作。子进程既可以是监督者进程也可以是工作进程。由于监督者的存在,Riak的代码库可以关注于正确的情况,监督者可以在整个系统中以一致的行为处理软件的bug、损坏的数据和系统错误。在Erlang的世界中,这种非防御式编程的方法通常称为“让它崩溃”策略。构成监督树的子进程既可以包含监督者也可以包含工作进程。工作进程指的是包含`gen_fsm`、`gen_server`或`gen_event`的OTP行为。由于Riak团队不需要处理边界错误条件,所以可以在较小的代码库中开发。由于使用了行为,所以这个代码库从一开始就比较小,因为代码库中只需要包含实现具体功能的代码。和大部分Erlang应用程序一样,Riak有一个顶层的监督者,还有监视一组组负责类似功能的进程的子监督者。具体的例子包括Riak的虚拟节点、TCP套接字监听者和查询响应管理器。
### 15.5.1\. 监督者回调函数
为了演示如何实现监督者行为,我们拿`riak_core_sup.erl`作为例子。Riak核心监督者是Riak核心应用程序的顶层监督者。这个监督者启动一组静态工作进程和监督者,同时启动一些动态工作进程处理节点的RESTful API的HTTP和HTTPS绑定,这些API在应用程序相关的配置文件中定义。和`gen_server`采用的方式类似,所有的监督者回调函数模块都必须包含`-behavior(supervisor).`指令。监督者通过`start`或`start_link`函数启动,这两个函数接受一个可选的`ServerName`参数、一个`CallBackModule`参数和一个`Argument`参数,`Argument`参数会被传入`init/1`回调函数。
下面看一下`riak_core_sup.erl`模块的前几行代码,包括`-behavior`指令和一个之后要描述的宏定义,注意这里调用的`start_link/3`函数:
~~~
-module(riak_core_sup).
-behavior(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
~~~
启动一个监督者会创建一个新的进程,并且调用回调模块`riak_core_sup.erl`中定义的`init/1`回调函数。`ServerName`是一个格式为`{local, Name}`或`{global, Name}`的元组,其中`Name`是监督者的注册名称。在我们的例子中,注册名称和回调函数名都为原子`riak_core_sup`,这个原子通过`?MODULE`宏得到。将空列表作为参数传入`init/1`,表示空参数。`init`函数是唯一的监督者回调函数。这个函数返回一个如下格式的元组:
~~~
{ok, {SupervisorSpecification, ChildSpecificationList}}
~~~
其中`SupervisorSpecification`是一个三元组`{RestartStrategy, AllowedRestarts, MaxSeconds}`,包含了如何处理进程崩溃和重启的信息。`RestartStrategy`是三个配置参数中的一个,表示一个行为异常终止的时候这个行为的兄弟受什么影响:
* `one_for_one`:监督树中的其他进程不受影响。
* `rest_for_one`:在终止的进程之后启动的进程被终止并被重启。
* `one_for_all`:所有的进程都被终止并被重启。
`AllowedRestarts`表示监督者的任何一个子进程在监督者终止自己(及其子进程)之前的`MaxSeconds`秒钟的时间内最多可以终止的次数。当一个进程终止的时候,这个进程向其监督者发送一个`EXIT`信号,监督者根据重启策略相应地处理这个终止的事件。监督者达到最大重启次数的限制之后终止可以保证循环重启以及其他在这个层次不能解决的问题能够提升严重等级。比如说,一个进程中的问题可能定位在一个不同的子监督树中,这种情况下,接收到升级的问题的监督者可以终止受到影响的子树并重启子树。
仔细看`riak_core_sup.erl`模块中`init/1`回调函数的最后一行,注意到这个特别的监督者采用了one-for-one策略,也就意味着进程之间是相互独立的。监督者允许重启自己之前进行最多10次重启。
`ChildSpecificationList`指定了这个监督者要启动和监视的子进程列表,其中带有如何终止和重启这些进程的信息。这个列表包含以下格式的元组:
~~~
{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}
~~~
`Id`是用于区分监督者的唯一标识符。`Module`、`Function`和`Arguments`是一个导出的函数,这个函数会调用行为的start_link函数,并且返回格式为`{ok, Pid}`的元组。`Restart`策略表示根据进程终止的类型进行什么操作,这个变量可以取值:
* `transient`进程,永远不重启;
* `temporary`进程,只有异常退出的时候才重启;
* `permanent`进程,总是要重启,不论终止原因是正常还是异常。
`Shutdown`是一个毫秒值,表示因为重启或关闭的时候,行为执行`terminate`函数的时间限制。还可以使用原子`infinity`,但是对于除了监督者之外的行为,强烈*不*建议使用`infinity`。`Type`可以取值原子`worker`,表示通用服务器、事件处理程序或有限状态机,还可以取值原子`supervisor`。还有`ModuleList`,这是实现这个行为的模块列表,用于运行时软件升级过程中对进程的控制和暂停。子进程描述列表中只能出现已经存在的行为或用户实现的行为,因此也只有这些行为才能出现在监督树中。
有了这些知识之后,我们现在应该可以基于一个公共架构编写一个定义了进程间依赖、容错阈值和升级扩散过程的重启策略。现在还应该能理解`riak_core_sup.erl`模块中`init/1`中进行的操作。首先学习一下`?CHILD`宏,这个宏创建一个子进程的描述,使用回调模块的名字作为`Id`,将子进程设置为`permanent`,并且将关闭时间设置为5秒钟。子进程的类型有`worker`也有`supervisor`。下面看一下这个例子,看看你是否能理解:
~~~
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
init([]) ->
RiakWebs = case lists:flatten(riak_core_web:bindings(http),
riak_core_web:bindings(https)) of
[] ->
%% check for old settings, in case app.config
%% was not updated
riak_core_web:old_binding();
Binding ->
Binding
end,
Children =
[?CHILD(riak_core_vnode_sup, supervisor),
?CHILD(riak_core_handoff_manager, worker),
?CHILD(riak_core_handoff_listener, worker),
?CHILD(riak_core_ring_events, worker),
?CHILD(riak_core_ring_manager, worker),
?CHILD(riak_core_node_watcher_events, worker),
?CHILD(riak_core_node_watcher, worker),
?CHILD(riak_core_gossip, worker) |
RiakWebs
],
{ok, {{one_for_one, 10, 10}, Children}}.
~~~
这个监督者启动的`Children`中大部分都静态定义为`worker`(只有`riak_core_vnode_sup`定义为`superviso`r)。`RiakWebs`部分是一个例外,这些子进程的定义是通过Riak的配置文件中的HTTP部分动态定义的。
除了库应用程序之外的每一个OTP应用程序(包括Riak中的那些应用程序)都有自己的监督树。在Riak中,有多个顶层应用程序运行在Erlang节点中,例如`riak_core`负责运行分布式系统的算法,`riak_kv`负责运行键值存储的语义,`webmachine`负责运行HTTP服务。前面的插图展示了`riak_core`展开的树形图,从中可以看出多层次监督的工作方式。这种结构有很多好处,其中之一就是如果某个子系统崩溃了(因为bug、环境问题或故意的操作等),那么只有这个子系统是第一个被终止的实例。
监督者会重启需要重启的进程,整个系统不会受到影响。在实践中,我们发现这种方式在Riak中运行得很好。也许某个用户会发现怎样让一个虚拟节点崩溃,但是这个虚拟节点很快会被`riak_core_vnode_sup`重启。如果用户知道了如何让`riak_core_vnode_sup`崩溃,那么`riak_core`会负责重启,将这个终止扩散到顶层监督者。这种故障隔离和恢复机制允许Riak开发者(和Erlang开发者)能够轻松地构建像小强一样的系统。
曾经有一个业界的大用户为了找出一些数据库系统中哪些部分会崩溃,所以他们创建了一个非常严酷的测试环境,此时,这个监督模型的价值就展现出来了。这个测试环境会随机爆发巨大的流量和故障条件。他们很诧异地发现即使是在这种最严酷的环境下Riak也不会停止工作。当然,在幕后,他们也有多种办法让单个的进程或子系统崩溃——但是每一次监控者都会做好清理工作,并且重启崩溃的部分,使得整个系统重新恢复工作。
### 15.5.2\. 应用程序
之前介绍的应用程序行为的作用是将Erlang模块和资源包装在可重用的组件中。在OTP中有两类应用程序。最常见的形式称为普通应用程序,这种应用程序会启动一个监督树和所有相关的静态工作进程。而另一种应用程序称为库应用程序,例如标准库就是这一类应用程序,这种应用程序属于Erlang发行版的一部分,包含库模块,但是不会启动监督树。这并不是说代码中不包含进程和监督树。这只是说它们能够作为属于另一个应用程序的监督树的一部分而启动。
一个Erlang系统会包含一组松耦合的应用程序。有一些是由开发者编写的,一些是开源的应用程序,剩下的其他应用程序则属于Erlang/OTP发行版的一部分。Erlang运行时系统及其工具平等对待所有的应用程序,不论这些应用程序属于或不属于Erlang发行版的一部分。
## 15.6\. Riak中的复制和通信
Riak从Amazon的Dynamo存储系统[[DHJ+07](http://www.aosabook.org/en/bib1.html#bib%3aamazon%3adynamo)]获得灵感,是为超大规模情况下极高可靠性和可用性而设计的。Dynamo和Riak的架构结合了分布式散列表(Distributed Hash Tables,DHT)和传统数据库的特点。副本放置(replica placement)的一致性散列(consistent hashing)和流言传播协议(gossip protocol)是Riak和Dynamo都使用了的两大关键技术。
一致性散列要求系统中所有的节点都相互知道,而且知道每一个节点都拥有哪些分区。这种分配数据可以放置在一个集中管理的配置文件中,但是在一个大规模的配置中,这种要求可能极难满足。另一种替换方法是使用一个中央配置服务器,但是这种方式在系统中会引入单点故障的问题。而Riak采用的方法是通过流言传播协议将集群归属信息和分区所有权数据扩散在整个系统中。
流言传播协议也称为传染病协议(epidemic protocol)。顾名思义,当系统中一个节点想要更新一段共享数据的时候,这个节点对数据的本地副本进行更新,然后将更新后的数据随机地发送给一个节点。当一个节点收到一个更新后,这个节点将接收到的更新和本地状态合并,然后再次发送给另一个随机节点。
当一个Riak集群启动的时候,所有节点都必须配置相同的分区数。然后一致性散列环被分割为分区数个分区,每一段都在本地保存为一个`{HashRange, Owner}`对。集群中的第一个节点占有所有的分区。当一个新节点加入集群的时候,获得已有节点的`{HashRange, Owner}`对列表,然后要求占有(分区数)/(节点数)个对,用新的所有权更新其本地状态。然后将更新后的所有权信息通过流言传播协议发送给一个节点。接下来这个更新后的信息通过上述算法扩散到整个集群。
通过使用流言传播协议,Riak可以避免因为中央配置服务器而引入的单点故障的问题,系统操作员也不用维护关键的集群配置数据了。任何一个节点都可以根据通过流言传播协议得到的分配数据对收到的请求进行路由决策。通过结合使用流言传播协议和一致性散列,Riak可以按照一个真正去中心化系统的方式工作,这对于部署和运营大规模系统是非常重要的。
## 15.7\. 结论和收获
大多数程序开发人员都相信更小更简洁的代码库不仅更易于维护,而且bug更少。在集群中通过Erlang提供的基本分布式原语进行通信,使得Riak在开始开发的时候就有一个非常健壮的异步消息层作为基础,并且可以在这个基础之上构建自己的协议,而不用担心底层的实现。随着Riak慢慢发展为一个成熟的系统,有一些和网络通信相关的部分脱离了使用Erlang内建的功能(为了直接操纵TCP套接字),而其他部分和Erlang包含的原语结合得很好。由于一开始就用Erlang原生的消息传递机制实现一切,所以Riak团队能够很快地构建出整个系统。这些原语足够地干净和清晰,使得之后可以很容易地将一些被证明不是最适合产品的地方替换掉。
同样,由于Erlang消息传递的本质和Erlang虚拟机轻量级的内核,用户可以轻松地在1台机器上运行12个节点,也可以在12台机器上运行12个节点。相比起更重量级的消息传递和集群机制来说,这种方式的开发和测试要简单得多。由于Riak分布式的基础本质,这一点尤其重要。过去,大部分分布式系统都很难在一个开发者的笔记本电脑上以“开发模式”操作。因此,开发者们往往需要以完整系统的一个子集作为环境进行代码测试,这种子集的行为往往会有很大的差异。由于多节点的Riak集群可以很简单地在一台笔记本上运行,而不需要很大的资源消耗或复杂的配置,所以这种开发过程很容易得到能够直接在生产环境部署的代码。
通过使用Erlang/OTP的监督者行为,Riak在面对子模块崩溃的时候能表现得更加顽强。Riak还能更进一步,从这种行为得到灵感,Riak集群甚至在整个节点崩溃从系统中消失的时候依然能够保持功能。这种顽强程度令人惊叹。例如大型企业对多个数据库进行压力测试并且故意让数据库崩溃以观察边缘条件的时候,他们会感到非常惊讶。当他们测试Riak的时候,他们感觉非常不解。每一次他们发现一种方法(例如通过操作系统级别的操作或损坏的进程间通信等方法)让Riak的某个子系统崩溃的时候,他们都能观察到一个短暂的性能下降,然后系统又会恢复正常的行为。这就是深思熟虑的“让它崩溃”策略的直接结果。Riak有规则地根据需求重新启动每一个需要重启的子系统,这样整个系统就能继续工作了。这种体验完美地展示了通过Erlang/OTP的方式构建程序能够达到的健壮程度。
### 15.7.1\. 致谢
这一章的内容来源于Francesco Cesarini和Simon Thompson 2009年在布达佩斯和科马尔诺举办的中欧函数式编程学校(Central European Functional Programming School)上的演讲稿。主要的贡献来自于英国坎特伯雷的肯特大学的Simon Thompson。要特别感谢所有的审稿人,谢谢你们在本章编写的各个阶段提供的有价值的反馈意见。
- 前言(卷一)
- 卷1:第1章 Asterisk
- 卷1:第3章 The Bourne-Again Shell
- 卷1:第5章 CMake
- 卷1:第6章 Eclipse之一
- 卷1:第6章 Eclipse之二
- 卷1:第6章 Eclipse之三
- 卷1:第8章 HDFS——Hadoop分布式文件系统之一
- 卷1:第8章 HDFS——Hadoop分布式文件系统之二
- 卷1:第8章 HDFS——Hadoop分布式文件系统
- 卷1:第12章 Mercurial
- 卷1:第13章 NoSQL生态系统
- 卷1:第14章 Python打包工具
- 卷1:第15章 Riak与Erlang/OTP
- 卷1:第16章 Selenium WebDriver
- 卷1:第18章 SnowFlock
- 卷1:第22章 Violet
- 卷1:第24章 VTK
- 卷1:第25章 韦诺之战
- 卷2:第1章 可扩展Web架构与分布式系统之一
- 卷2:第1章 可扩展Web架构与分布式系统之二
- 卷2:第2章 Firefox发布工程
- 卷2:第3章 FreeRTOS
- 卷2:第4章 GDB
- 卷2:第5章 Glasgow Haskell编译器
- 卷2:第6章 Git
- 卷2:第7章 GPSD
- 卷2:第9章 ITK
- 卷2:第11章 matplotlib
- 卷2:第12章 MediaWiki之一
- 卷2:第12章 MediaWiki之二
- 卷2:第13章 Moodle
- 卷2:第14章 NginX
- 卷2:第15章 Open MPI
- 卷2:第18章 Puppet part 1
- 卷2:第18章 Puppet part 2
- 卷2:第19章 PyPy
- 卷2:第20章 SQLAlchemy
- 卷2:第21章 Twisted
- 卷2:第22章 Yesod
- 卷2:第24章 ZeroMQ