Mix 和 OTP-GenServer

  1. 我们第一个GenServer
  2. 测试一个GenServer
  3. 监测的必要性
  4. call、cast还是info?
  5. 监测还是链接?

本章是Mix和OTP教程的一部分,它依赖这个教程的前面章节。要获得更多信息,请阅读本教程的第一章,或者查看本教程的章节索引。

上一章我们用agent来表示我们的bucket(容器)。在第一章中,我们指出了我们要命名每个bucket,这样我们就可以做到以下几点:

1
2
3
4
5
6
7
8
9
CREATE shopping
OK
PUT shopping milk 1
OK
GET shopping milk
1
OK

既然agent是进程,那么每个bucket有一个进程标识符(pid),但是它没有名字。我们已经在进程那一章中了解了的名字注册,你可能会倾向于使用此注册来解决这个问题。例如,我们可以像下面这样来创建一个bucket:

1
2
3
4
5
6
iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.43.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1

然而,这是一个可怕的想法!在Elixir中进程的名字必须是原子,这意味着我们需要将bucket的名字(通常是从外部客户端接收过来的)转换为原子,而我们不应该将用户输入转换为原子。这是因为原子没有垃圾回收。一个原子一旦被创建,它就再也不会被回收。从用户的输入来生成原子将意味着用户可以注入足够多的不同名字来耗尽我们系统的内存!

在实践中,更可能的是你会耗尽内存也就是使得你的系统崩溃之前达到Erlang虚拟机的原子的最大数量的限制。

我们不滥用名字注册的功能,反而是创建我们自己的注册进程,它持有一个映射来关联bucket的名字和bucket的进程。

注册需要保证字典内容总是最新的。例如,一个bucket进程因为bug而崩溃了,注册必须清理字典来避免数据被污染。在Elixir中,我们描述这一点,说注册表需要监视每个bucket。

我们将使用一个 GenServer 来创建一个注册进程,它可以监测bucket进程。GenServer提供工业级强度的功能来构建Elixir和OTP里的服务器。

我们第一个GenServer

一个GenServer被实现为两部分:客户端API和服务端回调函数。你既可以把这两部分合并到一个单独的模块里,也可以把它们分拆到一个客户端模块和一个服务端模块。客户端和服务端分别运行在隔离的进程里,客户端和服务端来回传递消息,而服务器在内部基于收到的消息而调用相关的函数。本文我们将用一个单独的模块来容纳客户端API和服务端回调函数。

创建一个文件 lib/kv/registry.ex ,它的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link do
GenServer.start_link(__MODULE__, :ok, [])
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
"""
def lookup(server, name) do
GenServer.call(server, {:lookup, name})
end
@doc """
Ensures there is a bucket associated to the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server Callbacks
def init(:ok) do
{:ok, %{}}
end
def handle_call({:lookup, name}, _from, names) do
{:reply, Map.fetch(names, name), names}
end
def handle_cast({:create, name}, names) do
if Map.has_key?(names, name) do
{:noreply, names}
else
{:ok, bucket} = KV.Bucket.start_link
{:noreply, Map.put(names, name, bucket)}
end
end
end

第一个函数是 start_link/3 ,它接收三个入参来启动一个新的GenServer:

  1. 服务端回调函数被实现所在模块,本例子里是 __MODULE__ ,意思就是当前模块。
  2. 初始化参数,本例子里是原子 :ok 。
  3. 一个选项列表,它可以被用来指定一些事项,比如服务器名字。本例子里我们传递了一个空列表。

有两种类型请求你可以发送给一个GenServer:call 和 cast。call是同步的,服务端必须发送一个响应给这一个的请求。cast是异步的,服务端不需要返回响应。

接下来的两个函数,lookup/2 和 create/2 负责发送这些请求给服务端。在本例子中,我们分别用 {:lookup, name} 和 {:create, name} 作为请求消息。请求消息常常被指定为元组,就像本例一样,是为了在第一个参数位置提供多于一个“参数”。通常指定请求的动作作为元组的第一个元素,而剩下的元素作为该动作的参数。注意:请求必需匹配 handle_call/3 或 handle_cast/2 的第一个入参。

上面讲的就是客户端API。在服务器这边,我们可以实现各种各样的回调函数来保证服务器的初始化、终止和处理请求。那些回调函数是可选的,目前我们只实现我们所关心的。

第一个是 init/1 回调函数,它接收传给 GenServer.start_link/3 的参数并且返回 {:ok, state} ,此处的状态是一个新的映射。我们可能已经注意到GenServer的API是如何使得客户端和服务端隔离的更明显的。start_link/3 在客户端,而它所对应的回调函数 init/1 运行在服务端。

对应于 call/2 的请求,我们实现一个 handle_call/3 回调函数来接收它的请求、请求的来源进程(_from),以及当前服务器状态(state)。handle_call/3 回调函数返回一个格式为 {:reply, reply, new_state} 的元组。这个元组的第一个元素,:reply ,表明服务器将发送一个应答给客户端。第二个元素,reply,就是发送给客户端的应答。而第三个元素,new_state,是一个新的服务器状态。

对应于 cast/2 的请求,我们实现了一个 handle_cast/2 回调函数来接收它的请求和当前服务器状态(state)。handle_cast/2 回调函数返回一个格式为 {:noreply, new_state} 的元组。注意:在一个真实的应用里,可能应该用同步调用的方式为 :create 实现回调函数来替换异步的cast函数。我们在本例里这么做是为了说明如何实现一个cast函数的回调。

handle_call/3 和 handle_cast/2 函数还有其他格式的返回元组。也有其他的回调函数我们可以实现,比如:terminate/2 和 code_change/3 。欢迎你探索整个GenServer文档以了解关于这方面的更多内容。

现在,让我们写一些测试来保证我们的GenServer如我们所期望地工作。

测试一个GenServer

测试一个GenServer和测试一个Agent没有太大的不同。我们将在测试的setup回调函数里创建一个服务器,并在我们整个测试中使用它。创建一个文件:test/kv/registry_test.exs ,它的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
defmodule KV.RegistryTest do
use ExUnit.Case, async: true
setup do
{:ok, registry} = KV.Registry.start_link
{:ok, registry: registry}
end
test "spawns buckets", %{registry: registry} do
assert KV.Registry.lookup(registry, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
end

我们的测试应该毫无阻碍地通过!

你不需要明确地停掉registry,因为当我们的测试结束的时候,它将收到一个 :shutdown 信号。虽然这么处理相应的测试来说没问题。但是如果需要将停止一个GenServer作为一个应用里逻辑的一部分,那么你可以使用 GenServer.stop/1 函数:

1
2
3
4
5
6
7
8
## 客户端 API
@doc """
Stops the registry.
"""
def stop(server) do
GenServer.stop(server)
end

监测的必要性

我们的registry几乎完成了。仅剩的问题是,如果一个bucket停止或崩溃了,registry的数据可能会变得过时。让我们增加一个测试用例到 KV.RegistryTest 来揭示这个bug:

1
2
3
4
5
6
test "removes buckets on exit", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
Agent.stop(bucket)
assert KV.Registry.lookup(registry, "shopping") == :error
end

上面的测试用例将在最后的断言处失败,因为bucket的名字还保留在registry里,即使在我们停止了bucket进程后。

为了修复这个bug,我们需要registry监测每一个由它创建的bucket。一旦我们设置了监测,每次bucket退出的时候,registry将收到一个通知,允许我们来清理字典。

让我们用 iex -S mix 启动一个新的控制台,先来玩一下监测:

1
2
3
4
5
6
7
8
iex> {:ok, pid} = KV.Bucket.start_link
{:ok, #PID<0.66.0>}
iex> Process.monitor(pid)
#Reference<0.0.0.551>
iex> Agent.stop(pid)
:ok
iex> flush()
{:DOWN, #Reference<0.0.0.551>, :process, #PID<0.66.0>, :normal}

注意:Process.monitor(pid) 返回一个唯一的引用,这允许我们匹配将要到来的消息,从而监测引用。我们停止Agent后,我们可以 flush/0 所有消息,注意到:一个 :DOWN 消息到达,带着与monitor返回的一样的引用,告诉我们,bucket进程因为 :normal 原因退出。

让我们重新实现服务端回调函数来修复这个bug并使得测试通过。首先,我们将修改GenServer的状态为两个字典:一个包含 name -> pid ,另一个包含 ref -> name。然后我们需要在 handle_cast/2 里监测bucket,也实现一个 handle_info/2 回调函数来处理监测消息。所有服务端回调函数的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
## 服务端回调函数
def init(:ok) do
names = %{}
refs = %{}
{:ok, {names, refs}}
end
def handle_call({:lookup, name}, _from, {names, _} = state) do
{:reply, Map.fetch(names, name), state}
end
def handle_cast({:create, name}, {names, refs}) do
if Map.has_key?(names, name) do
{:noreply, {names, refs}}
else
{:ok, pid} = KV.Bucket.start_link
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
names = Map.put(names, name, pid)
{:noreply, {names, refs}}
end
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
{name, refs} = Map.pop(refs, ref)
names = Map.delete(names, name)
{:noreply, {names, refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end

注意,我们能够在不改变任何客户端API的情况下,大幅度地改变服务器实现。这就是明确地隔离服务器和客户端的好处之一。

最后,不同于其他回调函数,我们为 handle_info/2 回调函数定义了一个“catch-all”分支,它丢弃任何未知的消息。想知道为什么,请移步下一节。

call、cast还是info?

我们已经用了三个回调函数:handle_call/3,handle_cast/2 和 handle_info/2。现在我们要考虑什么时候决定和什么时候使用它们:

  1. handle_call/3 必须用于同步请求。这应该是等待服务器的回复是一个有用的反压机制的默认选择。
  2. handle_cast/2 必须用于异步请求,而且当你不关心响应的时候。cast甚至不保证服务端收到了消息,因此,它应该尽量少用。例如,本章我们已经定义的 create/2 函数应该使用 call/2 。我们使用 cast/2 是为了教学目的。
  3. handle_info/2 必须用于服务端接收到的不是GenServer.call/2 或 GenServer.cast/2发送的其他所有消息,包括了用send/2发送的普通消息。监测消息 :DOWN 就是其中的例子。

因为任何消息,包括send/2方式的消息都由handle_info/2处理,则有机会不期望的消息到达服务器。因此,如果我们没有定义catch-all分支,那些消息将导致我们的registry崩溃,因为没有分支可以匹配它们。但是我们却不用担心这样的情况发生在handle_call/3 和 handle_cast/2。call和cast只能通过GenServer的API来做,所以一个未知消息就十分可能是由开发者的错误造成。

为了帮助开发者记住call,cast 和 info之间的区别,它们支持的返回值以及其他知识,Benjamin Tan Wei Hao已经创建了非常棒的GenServer备忘录.

监测还是链接?

我们已经在Process这章学习了链接。现在,我们完成了registry模块,我们可能想知道:什么时候我们应该用监测而什么时候我们应该用链接?

链接是双向的。如果你链接了两个进程,其中一个进程崩溃,则另一个进程也将崩溃(除非它捕获了退出信号)。监测是单向的:只有监测的进程才收到关于被监测进程的通知。也就是说:当你想链接崩溃的时候使用链接,而当你只是想获得崩溃、退出等的信息的时候就使用检测。

回到我们的 handle_cast/2 实现,你可以看到registry既链接也监测bucket:

1
2
{:ok, pid} = KV.Bucket.start_link
ref = Process.monitor(pid)

这是一个坏注意,因为我们不想我们的registry在bucket崩溃的时候也崩溃!我们通常避免直接创建进程,而是把这个责任委托给监督者。正如我们将在下一章所见,监督者依赖于链接,这就解释了为什么基于链接的API(spawn_link,start_link,等)在Elixir和OTP里很普遍。

原文链接: http://elixir-lang.org/getting-started/mix-otp/genserver.html