Mix 和 OTP-ETS

  1. ETS作为缓存
  2. 竞争条件

本章是Mix和OTP教程的一部分,它依赖这个教程的前面章节。要获得更多信息,请阅读本教程的第一章,或者查看本教程的章节索引。

每次我们需要查询一个bucket,我们需要发送一个消息给registry。这样的话,我们的registry被多个进程并发地访问,registry可能成为瓶颈!

在本章,我们将学习ETS(Erlang数据存储)并且学习如何使用它来作为缓存机制。

警告:不要贸然使用ETS作为缓存!记录并分析你应用的性能并识别哪部分是瓶颈,这样你就知道你应该在哪里缓存,你应该缓存什么。本章只是在你一旦需要的情况下,ETS如何被使用的一个例子。

ETS作为缓存

ETS允许我们存储任何Elixir数据到一个内存表里。与ETS表交互是通过Erlang :ets 模块做到的:

1
2
3
4
5
6
iex> table = :ets.new(:buckets_registry, [:set, :protected])
8207
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]

当创建一个ETS表的时候,需要两个入参:一个表名字和一个选项集合。从可用选项中,我们选择传递表类型和它的访问规则。我们选择了 :set 类型,这意味着键不能是重复的。我们也设置表的访问规则为 :protected ,这意味着只有创建表的进程能写这个表,不过其他进程可以从表里读取数据。这两个选项实际上是默认值,所以从现在开始我们将忽略它们。

ETS表可以被命名,允许我们通过给定的名字访问它:

1
2
3
4
5
6
iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]

让我们修改 KV.Registry 来使用ETS表。因为我们的registry需要一个名字作为入参,我们计划用与registry相同的名字来命名ETS表。ETS的名字和进程的名字存储在不同的地方,因此没有冲突的可能。

打开 lib/kv/registry.ex ,让我们修改它的实现。我们增加注释到源码中来强调我们所做的修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
defmodule KV.Registry do
use GenServer
## 客户端 API
@doc """
Starts the registry with the given `name`.
"""
def start_link(name) do
# 1. 传递名字给GenServer的init回调函数
GenServer.start_link(__MODULE__, name, name: name)
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
"""
def lookup(server, name) when is_atom(server) do
# 2. 查询现在直接在ETS里做,没有访问服务器
case :ets.lookup(server, name) do
[{^name, pid}] -> {:ok, pid}
[] -> :error
end
end
@doc """
Ensures there is a bucket associated to the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
@doc """
Stops the registry.
"""
def stop(server) do
GenServer.stop(server)
end
## Server callbacks
def init(table) do
# 3. 我们已经用ETS表替换名字映射
names = :ets.new(table, [:named_table, read_concurrency: true])
refs = %{}
{:ok, {names, refs}}
end
# 4. 原来为查询服务的handle_call回调函数被删除
def handle_cast({:create, name}, {names, refs}) do
# 5. 读写ETS表而不是映射
case lookup(names, name) do
{:ok, _pid} ->
{:noreply, {names, refs}}
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
:ets.insert(names, {name, pid})
{:noreply, {names, refs}}
end
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
# 6. 从ETS表而不是从映射删除
{name, refs} = Map.pop(refs, ref)
:ets.delete(names, name)
{:noreply, {names, refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end

注意:在我们修改前, KV.Registry.lookup/2 发送请求给服务器,但是现在它直接从ETS表里读取,ETS表被所有进程共享。这是我们实现的缓存机制背后的主要思想。

为了让缓存机制工作,被创建的ETS表需要有 :protected 访问规则(访问规则的默认值),因此所有客户端可以从它读取数据,而只有 KV.Registry 进程可以写数据到这个表里。当表启动的时候,我们已经设置 read_concurrency: true ,这样就优化了表的并发读取操作的通用场景。

我们上面的修改已经使得我们的测试失败,因为我们原来是用registry进程的pid来做所有操作,而现在registry查询需要ETS表的名字。但是,ETS表的名字和registry进程的名字相同,这个问题就很容易修复。如下所示修改 test/kv/registry_test.exs 的setup函数:

1
2
3
4
setup context do
{:ok, _} = KV.Registry.start_link(context.test)
{:ok, registry: context.test}
end

一旦我们修改 setup ,一些测试用例将继续失败。你可能甚至注意到,在不同的运行之间,测试的通过和失败也不是一致的。例如:“spawns buckets”测试用例:

1
2
3
4
5
6
7
8
9
test "spawns buckets", %{registry: registry} do
assert KV.Registry.lookup(registry, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end

可能在这一行失败:

1
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")

如果我们刚刚在上一行创建了bucket,这一行为何会失败?

那些发生的失败的原因是因为为了教学目的,我们已经犯两个错误:

  1. 我们过早地优化(通过增加这个缓存层)
  2. 我们使用 cast/2 (但是我们应该使用 call/2 )

竞争条件

在Elixir里做开发不会使得你的代码免于竞争条件。然而,Elixir的简单抽象,默认情况下不共享数据,更容易发现竞争条件的根本原因。

在我们测试里所发生的是,在一个操作和我们可以观察到在ETS表里这个改变的时间之间有一个延迟。下面是我们所期望的事件发生步骤:

  1. 我们调用 KV.Registry.create(registry, “shopping”)
  2. registry创建bucket并修改缓存表
  3. 我们用 KV.Registry.lookup(registry, “shopping”) 从表访问信息
  4. 上面的命令返回 {:ok, bucket}

但是,因为 KV.Registry.create/2 是一个 cast 操作,这个命令在我们实际写表前返回。换句话说,实际发生的步骤如下:

  1. 我们调用 KV.Registry.create(registry, “shopping”)
  2. 我们用 KV.Registry.lookup(registry, “shopping”) 从表访问信息
  3. 上面的命令返回 :error
  4. registry创建bucket并修改缓存表

要修复这个错误,我们需要通过用 call/2 替代 cast/2 来使得 KV.Registry.create/2 成为同步操作。这将保证在修改被写入表后客户端才继续执行。让我们如下所示修改这个函数和它的回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def create(server, name) do
GenServer.call(server, {:create, name})
end
def handle_call({:create, name}, _from, {names, refs}) do
case lookup(names, name) do
{:ok, pid} ->
{:reply, pid, {names, refs}}
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
:ets.insert(names, {name, pid})
{:reply, pid, {names, refs}}
end
end

我们将回调函数从 handle_cast/2 改为 handle_call/3,并且修改它用被创建的bucket的pid作为返回结果。总的来说,Elixir开发者喜欢用 call/2 而不是 cast/2 ,因为 call/2 也提供反压(你被阻塞一直到你获得响应)。在不必要的时候使用 cast/2 也可以被认为是一种过早优化。

让我们再次运行测试,这次,我们将传递 --trace 参数:

1
$ mix test --trace

当你的测试有死锁或有竞争条件的时候 --trace 选项有用,因为它同步地运行所有测试(async: true 无效了)并且显示每一个测试详细信息。这次我们将减少到一到两个不连续的失败:

1
2
3
4
5
6
7
8
1) test removes buckets on exit (KV.RegistryTest)
test/kv/registry_test.exs:19
Assertion with == failed
code: KV.Registry.lookup(registry, "shopping") == :error
lhs: {:ok, #PID<0.109.0>}
rhs: :error
stacktrace:
test/kv/registry_test.exs:23

根据失败消息,我们期望bucket不再存在表里,但是它依然还在!这个问题和我们刚才解决的问题相反:已经解决的问题是在创建bucket的命令和修改表之间有延迟,现在的这个问题是bucket进程死亡和它的信息从表里删除之间存在延迟。

不幸的是,这次我们不能简单地修改负责清理ETS表的 handle_info/2 函数 为同步操作。相反,我们要找到一个方法保证registry已经处理了当bucket崩溃的时候发送给它的 :DOWN 通知。

一个简单的方法来做到这点,就是通过发送一个同步请求给registry:因为消息按顺序被处理,如果registry响应了一个在Agent.stop调用后发送的请求,这就说明 :DOWN 消息已经被处理了。让我们通过创建一个“假的”bucket来做到这点,它是一个在Agent.stop后的同步请求,在两个测试用例里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
test "removes buckets on exit", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
Agent.stop(bucket)
# Do a call to ensure the registry processed the DOWN message
_ = KV.Registry.create(registry, "bogus")
assert KV.Registry.lookup(registry, "shopping") == :error
end
test "removes bucket on crash", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
# Kill the bucket and wait for the notification
Process.exit(bucket, :shutdown)
# Wait until the bucket is dead
ref = Process.monitor(bucket)
assert_receive {:DOWN, ^ref, _, _, _}
# Do a call to ensure the registry processed the DOWN message
_ = KV.Registry.create(registry, "bogus")
assert KV.Registry.lookup(registry, "shopping") == :error
end

我们的测试用例现在应该(一直)通过!

到此我们的优化章节结束。我们用ETS作为一个缓存机制,读取可以从任何进程发起,但是写入依然只能通过一个单独的进程顺序执行。更重要的是,我们也学到,一旦数据可以被异步读取,我们需要明白它可能引入竞争条件。

下一章我们将讨论外部和内部的依赖以及Mix如何帮助我们管理大型代码库。

原文链接: http://elixir-lang.org/getting-started/mix-otp/ets.html