Erlang的事件驱动应用

介绍

事件 是你个人工具箱里最好的工具之一。甚至在OTP设计原则有专门一章是关于事件的,还有一种 事件驱动架构 可以让你的代码和架构更牛X:

  • 它更易于将你的代码真正地解耦。
  • 它打开了一扇门,通往令人感兴趣的新的架构模式,比如 CQRS(命令查询职责分离模式),event sourcing(事件源模式),和
    event collaboration(事件合作模式)。
  • 应用程序可以被扩展,扩展的方式是通过创建“自治的组件”来“插入”系统,而这些组件通过触发命令或者事件来做它们自己的事情。比如一些事件如下:user_logged_in,user_created,user_voted,incoming_tweet,user_tweeted,等等。
  • 事件和命令能够被非常小的事件处理器来处理。
  • 如果你真的要用事件来开发系统,那么就可能像rabbitmq0mq一样通过一个队列来派发事件和命令。这也将引入几个有趣的知识点:
  • 系统可以用任何你喜欢的语言(Erlang,Scala,Python,Ruby,js-node,等等)来构造。
  • 为了可扩展性,只要插入所需的足够多的同类型或不同类型的命令处理器或事件处理器。
  • 一些“疯狂特性”的实现和原型是非常有可能的而且可以很快地完成!它们将可能用非erlang语言实现(如果用队列的话)。

Erlang已经提供了一种方式来实现事件驱动架构:gen_event 行为。gen_event 是 Erlang/OTP 自带的默认行为之一。对我来说,它也是Erlang/OTP最酷的特性之一,因为你在许多语言或环境里都无法得到这样开箱即用(并且免费)的特性,因此这真的是非常棒。

它是如何运作的

Gen_event 有一个或多个事件管理器、事件处理器,来派发和处理事件:

事件管理器:一个事件管理器是一个 gen_event 进程。
事件处理器:事件处理器是正真处理事件的回调模块,并且被注册到一个或多个事件管理器。
事件:一个事件是任何erlang term,比如一个元组,一个原子,一个列表,等等。

在某些方面,gen_event 用起来像nodejs里的EventEmitter类,在概念上说也像观察者模式。这个概念是有某种事件总线(事件管理器)负责接收来自系统的事件并且将它们路由到相应的监听者。其基本的工作流如下:

  • 创建一个或者多个实现 gen_event
    行为

    的回调模块。在本文末尾的附录B里有完整的例子。
  • 启动一个事件管理器进程。
  • 注册一个或者多个事件处理器到事件管理器上。
  • 通过事件管理器派发事件。
  • 在你的事件处理器里异步(同步)处理派发过来的事件。

现在让我们按顺序研究上述每一步。

启动一个事件管理器

这就是调用如下函数一样简单:

1
{ok, Pid} = gen_event:start().

或者你计划把你的事件管理器包含在一个监督树里:

1
{ok, Pid} = gen_event:start_link().

用一个给定的名字注册一个事件管理器

1
{ok, Pid} = gen_event:start(my_event_bus).

或者也可以:

1
{ok, Pid} = gen_event:start_link(my_event_bus).

这些函数与start/0以及start_link/0的目的一样,只是它们允许你用一个指定的名字注册一个新的事件管理器。引述官方文档如下:

如果 EventMgrName={local, Name},这个事件管理器被以Name为名字用 register/2 本地注册。

如果 EventMgrName={global, GlobalName},这个事件管理器被以GlobalName为名字用 global:register_name/2 全局注册。

如果没有名字提供,则这个事件管理器不能被注册。

如果 EventManager={via, Module, ViaName},这个事件管理器将被Module注册到系统。这个回调模块Module应该导出如下函数 register_name/2 ,unregister_name/1 ,whereis_name/1 and send/2,这些函数和global模块里的相应函数的行为一样。因此,{via, global, GlobalName} 是一个有效的引用。

注册事件处理器

一旦你已经有了你的事件管理器并且把它运行起来了,就该通过用 add_handler/3 来添加一些事件处理器了。

1
add_handler(my_event_bus, my_event_handler, [arg1, arg2]).

上述函数将添加一个新的事件处理器到指定的事件管理器(你传入的第一个入参所指的)。事件管理器可以用如下几种方式指定:

  • 进程PID。
  • Name,如果事件管理器是本地注册的。
  • {Name, Node},如果事件管理器是在另一个节点本地注册的,或者 {global,
    GlobalName},如果事件管理器是全局注册的。
  • {via, Module, ViaName},如果事件管理器的注册是通过一个替代进程注册的。

第二个入参指定事件处理器的模块名,不过它也可以是{Module, Id},Module是回调模块的名字,而当有多个事件处理器用同一个回调模块的时候,Id则用于识别指定的事件处理器。

紧接着my_event_handler里的回调函数 init/1 被调用,传给 add_handler/3 的第三个入参将原封不动地传给 init/1 做入参。init/1 可能返回:

如果成功,函数将返回{ok, State} 或 {ok, State, hibernate},State是事件处理器内部初始状态。

如果返回{ok, State, hibernate},事件管理器将通过调用 proc_lib:hibernate/3 进入冬眠状态,一直等到下一个事件发生。

被监督的事件处理器

你可以用 add_sup_handler/3 在调用进程(也就是注册新事件处理器的进程)和事件处理器自己之间建立某种形式的监测(或监督):

1
add_sup_handler(my_event_bus, my_event_handler, [arg1, arg2]).

正如官方文档描述:

如果调用进程后续因为原因Reason而终止,事件管理器将通过用 {stop, Reason} 作为入参调用 Module:terminate/2 来删除事件处理器。

如果事件处理器后来被删除,事件管理器给调用进程发送一个消息 {gen_event_EXIT, Handler, Reason}。Reaseon是下列之一:

normal,如果事件处理器因为 delete_handler/3 的调用而被删除,或者被一个回调函数(见下面段落)返回 remove_handler 。

shutdown,如果事件处理器因为事件管理器结束而被删除。

{swapped, NewHandler, Pid},如果进程Pid已经调用 swap_handler/3 或 swap_sup_handler/3 来用另一个事件处理器 NewHandler 替换了当前事件处理器。一个term,如果事件处理器因为一个错误而被删除。term的值是依据错误而来的。

缺点

事件处理器是按顺序执行的,所以要尽量保持它的代码短小。如果你需要有上千个事件处理器,那么实现某种转发器是更好的选择。转发器的思路是有好几个子gen_event订阅一个主gen_event。这样将负载分发给所有感兴趣的监听者。

另一方面,如果你必需要在一个事件处理器里做费时的操作,你应该尝试用一个 gen_event caster,他将监听事件并派发独特的(普通的)erlang消息。

我们也要注意,当一个被监督的事件处理器退出的时候,gen_event讲给所有的事件处理器发送消息,因此要准备好如何处理这些消息。

派发事件

通过事件管理器派发一个事件实际上是相当简单的,仅仅需要调用 gen_event:notify/2

1
gen_event:notify(my_event_handler, {new_user_created, User}).

你可以在本文的附录A里看到gen_event如何派发一个事件的具体细节。

这个函数将异步地派发这个事件。这意味着这个函数调用将不会阻塞而是立即返回。也有同步派发事件的方式,就是使用函数 gen_event:sync_notify/2

1
gen_event:sync_notify(my_event_handler, {new_user_created, User}).

同步在这里的意思是这个函数的调用将阻塞并仅在所有事件处理器已经被调用并且处理了这个事件后才返回。

用消息替代 Notify/2

另一种方式来调用事件处理器是通过派发普通消息给事件管理器。

1
Pid ! {new_user_created}

这将调用所有注册的事件处理器的 handle_info/2 函数,gen_event官方文档说明如下:

当一个事件管理器接收到不是事件或者一个同步请求(或者一个系统消息)的任何其他消息时,每一个已经安装在这个事件管理器上的事件处理器的这个函数被调用。

处理事件

为了处理由notify/2派发的事件,你的回调模块需要实现 handle_event/2 函数,例子如下:

1
2
3
handle_event(bad_smell, State) ->
io:format("That's an aweful smell.. go clean your kitty's litter~n~n"),
{ok, State};

另外,为了处理消息,要实现 handle_info/2 函数:

1
2
3
handle_info(Info, State) ->
io:format("Got message: ~p", [Info]),
{ok, State}.

上面两个例子里的State是由 init/1 返回的数据。

上述两个函数返回值如下所述(当然我们还是引用官方文档):

如果这个函数返回 {ok, NewState} 或者 {ok, NewState, hibernate} ,这个事件处理器将留在事件管理器里并且带着可能被修改过的内部状态NewState。

如果{ok, NewState, hibernate} 返回,事件管理器也将进入冬眠状态(通过调用 proc_lib:hibernate/3),并等待下一个事件的发生。只要一个事件处理器返回{ok, NewState, hibernate} 则整个事件管理器进程进入冬眠状态。

如果函数返回 {swap_handler, Args1, NewState, Handler2, Args2},这个事件处理器将被 Handler2 替代,替换过程首先是调用 Module:terminate(Args1, NewState),然后再调用 Module2:init({Args2, Term}) ,这里的Term是 Module:terminate/2 的返回值。更多信息可以查看 gen_event:swap_handler/3 。

如果函数返回 remove_handler ,这个事件处理器将通过调用 Module:terminate(remove_handler, State)来被删除。

额外事项

停止事件管理器

停止事件管理器有时候是必须的并且通过调用 stop/1 很容易就做到:

1
gen_event:stop(my_event_bus).

这也会引起在所有已经注册的事件处理器上调用 terminate/2

在一个事件处理器上调用函数

你可以通过调用 call/3 来确切地调用一个事件处理器上指定的函数,就像你在 gen_server上做的一样:

1
ok = gen_event:call(my_event_bus, my_event_handler, {do_something}).

和gen_server一样,你可以在调用 call/4 到时候指定一个超时时间:

1
ok = gen_event:call(my_event_bus, my_event_handler, {do_something}, 5000).

gen_event将调用事件处理器模块的 handle_call/2 回调函数。例子如下:

1
2
handle_call(_Request, State) ->
{ok, this_is_my_reply, State}.

删除事件处理器

你可以通过调用 delete_handler/3 来删除你的事件处理器:

1
gen_event:delete_handler(my_event_bus, my_event_handler, [arg1, arg2]).

这个函数将从事件管理器删除这个事件处理器,而且也会将第三个参数传给 terminate/2 并调用它并将将调用结果返回。

列出所有已经注册的事件处理器

为了列出一个事件管理器当前注册的所有事件处理器,可以调用 which_handlers/1

gen_event:which_handlers(my_event_bus).

替换 (交换) 事件处理器

通过调用 swap_handler/3 也可以在运行时更换事件处理器:

1
gen_event:swap_handler(my_event_bus, {my_event_handler, [arg1]}, {my_new_event_handler, [arg2]}).

整个过程是:首先调用 my_event_handler:terminate([arg1], State) 然后调用 my_new_event_handler:init([arg2]),用一个新的事件处理器替换一个久的事件处理器。

如果老的事件处理器是被监督的,则新的事件处理器也被监督。你当然可以直接用同样的参数调用 swap_sup_handler/3 来做到。

感谢

再一次我要感谢 Fernando “El Brujo” Benavides 对本文的总体思考,关于gen_event缺陷的评论,以及他分享的gen_event转发器和caster。

作者

Marcelo Gornstein marcelo@inakanetworks.com

Github: marcelog

Homepage: http://marcelog.github.com

附录 A: gen_event实际上是如何派发一个事件的

函数 notify/2 的实际调用最终在你的erlang/otp源码的 lib/stdlib/src/gen_event.erl 文件的504行:

1
case catch Mod1:Func(Event, State) of

Func是原子 handle_event 或 handle_info。所以如果你的事件处理器崩溃或不知何故失败了,事件管理器是不会崩溃的。

如果运行一个被监督的事件处理器(用add_sup_handler/3启动的),在同一个文件的635行的 terminate/2 函数被调用。

1
Res = (catch Mod:terminate(Args, State)),

另外,被监督的事件处理器,在648行一个消息被发送给这个事件处理器注册到的进程

1
2
3
4
5
6
7
case Handler#handler.supervised of
false ->
ok;
Pid ->
Pid ! {gen_event_EXIT,handler(Handler),Reason},
ok
end.

附录 B: 回调模块样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-module(my_event_handler).
-behaviour(gen_event).
-export([
init/1, terminate/2, handle_info/2,
handle_call/2, code_change/3, handle_event/2
]).
init([]) ->
{ok, []}.
handle_info(_Info, State) ->
{ok, State}.
handle_call(_Request, State) ->
{ok, not_implemented, State}.
handle_event(bad_smell, State) ->
io:format("That's an aweful smell.. go clean your kitty's litter~n~n"),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Arg, _State) ->
ok.

原文链接: http://inaka.net/blog/2013/01/21/erlang-event-driven/