Mix 和 OTP-分布式任务和配置

  1. 我们第一个分布式代码
  2. 异步/等待
  3. 分布式任务
  4. 路由层
  5. 测试过滤器和标签
  6. 应用环境和配置
  7. 总结

本章是Mix和OTP教程的一部分,它依赖这个教程的前面章节。要获得更多信息,请阅读本教程的第一章,或者查看本教程的章节索引。

在这最后一章,我们将回到 :kv 应用并增加一个路由层,这个路由层将允许我们基于bucket名字在节点间分布请求。

路由层将接收如下格式的一张路由表:

1
2
[{?a..?m, :"foo@computer-name"},
{?n..?z, :"bar@computer-name"}]

路由器将根据表检查bucket名字的第一个字节然后据此来派发到合适的节点。例如,以字母“a”开头的一个bucket(?a表示字母“a”的Unicode代码点)将被派发到节点 foo@computer-name 。

如果匹配的条目指向处理请求的节点,那么我们就已经完成了路由,并且这个节点将执行请求的操作。如果匹配的条目指向一个不同的节点,我们将传递请求给这个节点,它将查询自己的路由表(可能与第一个节点的不同)并且做响应的动作。如果没有条目匹配,则一个错误将抛出。

你可能想知道为什么我们不告诉我们在我们的路由表找到的节点直接执行被请求的操作,而是传递路由请求给那个节点处理。尽管路由表和上面的那样简单,但在所有节点之间都可以合理地共享,这样当我们的应用程序增长时用这种方式传递路由请求使路由表变得更小。也许在某个时候,foo@computer-name 只是负责路由bucket的请求,而bucket被派发给不同的节点。用这种方式,bar@computer-name 不需要知道任何这些改变。

注意:我们将在本章在同一个机器上使用两个节点。你可以自由地使用两台(或更多)不同的机器在同一网络,但你需要做一些准备工作。首先,你需要确认所有机器有一个有完全相同值的 ~/.erlang.cookie 文件。其次,你需要保证 epmd 正运行在一个没有阻塞的端口(你可以运行 epmd -d 来输出调试信息)。最后,如果你想大体上学习更多分布式的知识,我们建议看 Learn You Some Erlang 里很棒的分布式章节

我们第一个分布式代码

Elixir自带了工具来链接节点以及在它们之间交换信息。实际上,当在一个分布式环境工作的时候,我们用进程、消息传递和接收消息相同的概念,因为Elixir进程是位置透明的。这就是说当发送一个消息的时候,不用关心接收进程是在同一个节点还是在另一个节点,虚拟机将能够在这两种情况下传递消息。

要运行分布式代码,我们需要用一个名字来启动虚拟机。名字可以是短的(当在同一个网络里)或长的(需要完整的计算机地址)。让我们启动一个新的IEx会话:

1
$ iex --sname foo

你现在可以看到提示符有一点不同,显示了节点名字和计算机名字:

1
2
Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help)
iex(foo@jv)1>

我的机器名是 jv ,所以在上面的例子里我看到 foo@jv ,不过你将得到一个不同的结果。在后续的例子里我们将使用 foo@computer-name,当使用这些代码的时候,你应该相应地修改它们。

让我们在shell里定义一个叫Hello的模块:

1
2
3
iex> defmodule Hello do
...> def world, do: IO.puts "hello world"
...> end

如果你在同一个网络里的其他机器上已经装好了Erlang和Elixir,你可以在它上面启动另外一个IEx会话。否则,你可以在另一个终端启动另一个IEx会话。在这两种情况,给它一个短名叫做bar:

1
$ iex --sname bar

注意,在这个新的IEx会话里,我们无法访问 Hello.world/0 :

1
2
3
iex> Hello.world
** (UndefinedFunctionError) undefined function: Hello.world/0
Hello.world()

但是我们可以从 bar@computer-name 在 foo@computer-name 上创建一个新进程!让我们来试一试(@computer-name 是依据你自己的机器名字来写的):

1
2
3
iex> Node.spawn_link :"foo@computer-name", fn -> Hello.world end
#PID<9014.59.0>
hello world

Elixir在其他节点上启动一个进程并返回它的pid。代码在 Hello.world/0 函数所在的节点上被执行。注意:结果“hello world”打印在节点bar上而不是节点foo上。也就是说,被打印的消息从foo节点传回给bar节点。这是因为创建在其他节点(foo)上的进程仍然是有节点bar的组领导。我们已经简单地在IO那一章介绍了组领导的概念。

我们和往常一样可以与Node.spawn_link/2返回的pid进行消息的发送和接收操作。让我们尝试一个快速ping-pong例子:

1
2
3
4
5
6
7
8
9
10
11
iex> pid = Node.spawn_link :"foo@computer-name", fn ->
...> receive do
...> {:ping, client} -> send client, :pong
...> end
...> end
#PID<9014.59.0>
iex> send pid, {:ping, self()}
{:ping, #PID<0.73.0>}
iex> flush()
:pong
:ok

从我们的快速探索,我们可以得出这样的结论:每一次我们需要做一个分布式计算的时候,我们应该用node.spawn_link/2在远程节点上创建进程。但是,我们已经从本教程学到,应该尽可能避免在监督树外创建进程,所以我们需要寻找其他方式。

有三种比 Node.spawn_link/2 更好的方式,我们在我们的实现里可以使用它们:

  1. 我们可以使用Erlang的 :rpc模块在远程节点上执行函数。在上面例子的bar@computer-name shell里,你可以调用 :rpc.call(:”foo@computer-name”, Hello, :world, []) ,它将打印“hello world”。
  2. 我们可以有一个运行在其他节点的服务,然后通过 GenServer API发送请求到那个节点。例如,你可以用 GenServer.call({name, node}, arg) 调用远程节点上的一个服务,或者传递远程进程的PID作为此函数的第一个入参。
  3. 我们可以使用我们在前一章学到的Task,因为它们可以在本地和远程节点被启动。

上述三种选项有不同的特点。:rpc 和使用 GenServer 都是在一个单独服务器里顺序执行你的请求,而 Task 在远程节点上高效地异步运行,它唯一的顺序点是被监督者创建的时候。

对于我们的路由层,我们将使用Task,但是其他方式也可以去尝试。

异步/等待

到目前为止,我们已经探索了独立启动和运行的任务而不考虑它们的返回值。但是,有时候运行一个任务,然后在稍后读取它的值是有用的使用方式。为此,Task也提供了 async/await 模式:

1
2
3
task = Task.async(fn -> compute_something_expensive end)
res = compute_something_else()
res + Task.await(task)

async/await 提供了一个种非常简单的机制并发地计算某些值。不仅如此,async/await 也可以被用于我们前面章节学到的 Task.Supervisor 。我们只需要用 Task.Supervisor.async/2 替换 Task.Supervisor.start_child/2 然后稍后用 Task.await/2 读取结果。

分布式任务

分布式任务和被监督任务是一样的。唯一的区别是,我们在监督者上创建任务的时候传递的是节点名。打开 :kv 应用的 lib/kv/supervisor.ex 。让我们加入一个任务监督者作为监督树的最后一个孩子:

1
supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),

现在,我们在 :kv 应用里启动两个命名的节点:

1
2
$ iex --sname foo -S mix
$ iex --sname bar -S mix

在 bar@computer-name 里,我们现在可以通过监督者在其他节点直接创建一个任务:

1
2
3
4
5
6
iex> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, fn ->
...> {:ok, node()}
...> end
%Task{owner: #PID<0.122.0>, pid: #PID<12467.88.0>, ref: #Reference<0.0.0.400>}
iex> Task.await(task)
{:ok, :"foo@computer-name"}

我们的第一个分布式任务获取任务运行节点的名字。注意,我们给了一个匿名函数给 Task.Supervisor.async/2 ,但是, 在分布式场景里,最好是明确地指定模块、函数和函数的入参:

1
2
3
4
iex> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, Kernel, :node, []
%Task{owner: #PID<0.122.0>, pid: #PID<12467.89.0>, ref: #Reference<0.0.0.404>}
iex> Task.await(task)
:"foo@computer-name"

这两者的区别是:匿名函数需要目标节点和调用者有一样的代码版本。使用模块、函数和函数入参更健壮,因为你只需要在给定的模块里找到匹配函数参数个数的函数。

有了这方面的知识,最后让我们来写路由代码。

路由层

创建一个有如下内容的文件 lib/kv/router.ex :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
defmodule KV.Router do
@doc """
Dispatch the given `mod`, `fun`, `args` request
to the appropriate node based on the `bucket`.
"""
def route(bucket, mod, fun, args) do
# Get the first byte of the binary
first = :binary.first(bucket)
# Try to find an entry in the table() or raise
entry =
Enum.find(table(), fn {enum, _node} ->
first in enum
end) || no_entry_error(bucket)
# If the entry node is the current node
if elem(entry, 1) == node() do
apply(mod, fun, args)
else
{KV.RouterTasks, elem(entry, 1)}
|> Task.Supervisor.async(KV.Router, :route, [bucket, mod, fun, args])
|> Task.await()
end
end
defp no_entry_error(bucket) do
raise "could not find entry for #{inspect bucket} in table #{inspect table()}"
end
@doc """
The routing table.
"""
def table do
# Replace computer-name with your local machine name.
[{?a..?m, :"foo@computer-name"},
{?n..?z, :"bar@computer-name"}]
end
end

让我们写一个测试来验证我们的路由器是否正常。创建一个名为 test/kv/router_test.exs 文件,其内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
defmodule KV.RouterTest do
use ExUnit.Case, async: true
test "route requests across nodes" do
assert KV.Router.route("hello", Kernel, :node, []) ==
:"foo@computer-name"
assert KV.Router.route("world", Kernel, :node, []) ==
:"bar@computer-name"
end
test "raises on unknown entries" do
assert_raise RuntimeError, ~r/could not find entry/, fn ->
KV.Router.route(<<0>>, Kernel, :node, [])
end
end
end

第一个测试用例调用 Kernel.node/0 ,它将返回基于bucket的名字是 “hello” 和 “world”的当前节点的名字。根据我们目前的路由表,我们应该分别获得 foo@computer-name 和 bar@computer-name 作为响应。

第二个测试检查未知条目引起的代码异常。

为了运行第一个测试,我们需要两个节点来运行。来到 apps/kv 目录,启动一个名叫 bar 节点,它将被用来运行测试用例。

1
$ iex --sname bar -S mix

然后运行测试:

1
$ elixir --sname foo -S mix test

测试将通过。

测试过滤器和标签

虽然我们的测试通过了,但是我们的测试结构变得越来越复杂。特别是,在我们的测试套件里只用 mix test 运行测试引起失败,因为我们的测试需要链接另一个节点。

幸运地,ExUnit有一个工具用来标签测试,允许我们运行指定的回调函数或者甚至完全基于那些标签过滤测试。我们已经在前面的章节里使用了 :capture_log 标签,它有被ExUnit指定的自己的语义。

这次让我们增加一个标签到 test/kv/router_test.exs :

1
2
@tag :distributed
test "route requests across nodes" do

写 @tag :distributed 等价于写 @tag distributed: true 。

测试被正确地标签,我们现在可以检查节点是否活在在网络里,如果没有节点没有在网络里活着,我们可以不考虑所有分布式测试。在 :kv 应用里打开 test/test_helper.exs ,然后加入如下内容:

1
2
3
4
exclude =
if Node.alive?, do: [], else: [distributed: true]
ExUnit.start(exclude: exclude)

现在用 mix test 运行测试:

1
2
3
4
5
6
7
$ mix test
Excluding tags: [distributed: true]
.......
Finished in 0.1 seconds (0.1s on load, 0.01s on tests)
7 tests, 0 failures, 1 skipped

这一次所有测试都通过了并且ExUnit警告我们,分布式测试被忽略了。如果你用 $ elixir --sname foo -S mix test 运行测试,只要 bar@computer-name 这个节点可用,则刚才被忽略的那个测试将被运行并成功通过。

mix test 命令也允许我们动态包括和排除标签。例如,我们可以运行 $ mix test --include distributed 来运行分布式测试,而不管 test/test_helper.exs 里设置的值。我们也可以传递 --exclude 从命令行排除特定的标签。最后,--only 可以被用来运行只有特别标签的测试:

1
$ elixir --sname foo -S mix test --only distributed

你可以在 ExUnit.Case 模块文档里阅读到更多的过滤器、标签和默认标签的内容。

应用环境和配置

目前我们硬编码路由表在 KV.Router 模块里。但是,我们想使得路由表是动态的。这不仅让我们配置开发/测试/生产环境,也允许不同节点在路由表里有不同内容。有一个OTP特性正好做这个事情:应用环境。

每一个应用有一个环境通过键来存储应用的特定配置。例如,我们可以存储路由表在 :kv 应用环境里,给它一个默认值,并且让其他应用在需要的时候修改这个表。

打开 apps/kv/mix.exs ,修改 application/0 函数返回如下内容:

1
2
3
4
5
def application do
[extra_applications: [:logger],
env: [routing_table: []],
mod: {KV, []}]
end

我们增加了一个新的键 :env 到应用里。它返回应用的默认环境,默认环境里有一个键为 :routing_table 的条目,其值为一个空的列表。应用程序环境与空表一起工作是有意义的,因为特定的路由表依赖于测试/部署结构。

为了在我们的代码里使用应用环境,我们需要用下面的内容来替换 KV.Router.table/0 :

1
2
3
4
5
6
@doc """
The routing table.
"""
def table do
Application.fetch_env!(:kv, :routing_table)
end

我们用 Application.fetch_env!/2 来读取 :kv 的环境的 :routing_table 条目。你可以在 Application 模块文档里找到更多操作应用环境的信息和其他函数。

因为我们的路由表现在是空的,我们的分布式测试将会失败。重启应用并且重新运行测试来看它的失败:

1
2
$ iex --sname bar -S mix
$ elixir --sname foo -S mix test --only distributed

应用环境有趣的是,它不仅被配置为当前应用,而是可以被配置为所有应用。这样的设置可以用 config/config.exs 文件来做到。例如,我们可以配置来修改IEx的默认提示符为其他值。只要打开 apps/kv/config/config.exs ,增加如下内容:

1
config :iex, default_prompt: ">>>"

用 iex -S mix 启动IEx,你可以看到IEx的提示符已经改变。

这意味着,我们也可以在 apps/kv/config/config.exs 文件里直接配置我们的 :routing_table :

1
2
3
4
# 用你的本地节点替换 computer-name 。
config :kv, :routing_table,
[{?a..?m, :"foo@computer-name"},
{?n..?z, :"bar@computer-name"}]

重启节点并再次运行分布式测试。现在它们应该全部通过。

从 Elixir v1.2 开始,所有伞型应用共享它们的配置,由于在伞型应用的根目录的 config/config.exs 文件里的如下一行,装载所有子应用的配置:

1
import_config "../apps/*/config/config.exs"

mix run 命令也接收 --config 标签,允许配置文件按需提供。这个方法可以用来启动不同的节点,每个节点有它自己特定的配置(例如,不同的路由表)。

总体而言,内置配置应用的能力以及我们已经建立了我们的软件作为一个伞应用的事实,给我们在部署软件的时候提供了很多选择。

我们可以:

  • 部署伞型应用在一个节点将使得它既作为TCP服务器也作为键值存储。
  • 部署 :kv_server 应用只作为TCP服务器只要路由表指向其他节点。
  • 当我们想一个节点只作为一个存储(没有TCP访问)的时候,则只不是 :kv 应用。

当我们在将来添加更多的应用程序时,我们可以继续使用粒度相同的级别来控制我们的部署,也可以继续选择用哪个配置给将要在生产上运行的应用。

我们也可以考虑用一个像Distillery的工具构建多发布版本,Distillery将打包被选择的应用和配置,包括当前安装的Erlang和Elixir安装,所以我们可以部署应用即使运行时没有预先在目标系统里被安装。

最后,我们在本章也学到一些新的东西,并且它也可以被应用到 :kv_server 。我们将留下一步作为一个练习:

  • 修改 :kv_server 应用从它的应用环境读取端口号而不是用硬编码值 4040 。
  • 修改和配置 :kv_server 应用使用路由功能而不是直接派发给本地 KV.Registry 。为了 :kv_server 的测试,你可以使得路由表指向它自己的当前节点。

总结

在本章,我们构建了一个简单的路由器作为一个方法来探索Elixir和Erlang虚拟机的分布式特性,也学会了如果配置它的路由表。这是我们的Mix和OTP教程的最后一章。

通过这个教程,我们已经构建一个简单的分布式键值存储当做一个机会来探索许多结构,像通用服务器、监督者、任务、agent、应用以及其他事物。不仅如此,我们为整个应用写了测试,熟悉 ExUnit ,并学习如何用 Mix 这个构建工具来完成很多任务。

如果你正在找用于生产的一个分布式键值存储,那么应该你一定去看看Riak,它也运行在Erlang虚拟机里。在Riak里,bucket被复制,避免数据丢失;不是用路由表,它们而是使用 一致性哈希 映射一个bucket到一个节点。当存储bucket的新的节点被加入到你的基础设施的时候,一致性哈希有助于减少需要被迁移的数据量。

这里有更多的课程可以学,希望目前为止你玩得开心!

原文链接: http://elixir-lang.org/getting-started/mix-otp/distributed-tasks-and-configuration.html