Mix 和 OTP-Task和gen_tcp

  1. Echo服务器
  2. Tasks
  3. Tasks监督者

本章是Mix和OTP教程的一部分,它依赖这个教程的前面章节。要获得更多信息,请阅读本教程的第一章,或者查看本教程的章节索引。

在本章,我们将学习如何使用Erlang的 :gen_tcp 模块来处理请求。这提供一个很好的机会来探索Elixir的Task模块。在将来的章节我们将扩展我们的服务器以便使得它可以真正地处理命令。

Echo服务器

我们将首先实现一个echo服务器来开始我们的TCP服务器。它将发送一个响应,响应内容是它接收到的请求的文本。我们将慢慢地改善我们的服务器,一直到它被监督并且准备好处理多链接。

从广义的角度来看,在一个 TCP 服务器执行以下步骤︰

  1. 侦听一个端口,直到端口可用,并获取套接字
  2. 在那个端口等待客户端链接并接受它
  3. 读取客户端的请求并写响应回去

让我们来实现这些步骤。来到 apps/kv_server 应用,打开 lib/kv_server.ex ,然后增加如下的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
require Logger

def accept(port) do
# 下面的选项的意思:
#
# 1. `:binary` - 以二进制数据方式接收数据(而不是列表)
# 2. `packet: :line` - 一行行地接收数据
# 3. `active: false` - 阻塞在 `:gen_tcp.recv/2` 一直到有数据可用
# 4. `reuseaddr: true` - 如果侦听者崩溃,允许我们重用地址
#
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false, reuseaddr: true])
Logger.info "Accepting connections on port #{port}"
loop_acceptor(socket)
end

defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end

defp serve(socket) do
socket
|> read_line()
|> write_line(socket)

serve(socket)
end

defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end

defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end

我们将通过调用 KVServer.accept(4040) 来启动我们的服务器,4040是一个端口。accept/1 里的第一步是侦听一个端口一直到套接字可用,接着调用 loop_acceptor/1 。loop_acceptor/1 是一个接收客户端链接的循环。对于每一个接收到的链接,我们调用 serve/1 。

serve/1 是另一个循环,它从套接字读取一行数据然后把这些数据写回给套接字。注意:serve/1 函数用了 管道运算符 |> 来表达操作的数据流。管道运算符计算其左边的表达式并将其结果作为第一个入参传递给其右边的函数。上述例子:

1
socket |> read_line() |> write_line(socket)

等价于:

1
write_line(read_line(socket), socket)

read_line/1 用 :gen_tcp.recv/2 实现从套接字接收数据,而write_line/2 用 :gen_tcp.send/2 写数据到套接字。

注意:serve/1 在 loop_acceptor/1 里被顺序地无限循环地调用,所以对 loop_acceptor/1 的尾调用永远不会到达,因此应该被避免。但是,正如我们将看到的,我们将需要在一个隔离的进程里执行 serve/1 ,所以我们很快就需要这样的尾调用。

这几乎是实现我们echo服务器所有需要的内容。让我们来试一下!

在 kv_server 应用里用 iex -S mix 启动一个IEx会话。在IEx里运行:

1
iex> KVServer.accept(4040)

服务器现在正在运行,你甚至会注意到控制台被阻塞。让我们用一个telnet客户端来访问我们的服务器。大多数操作系统上都有客户端,它们的命令行大致相同:

1
2
3
4
5
6
7
8
9
10
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

输入“hello”,按下回车键,你将得到回复的“hello”。完美!

我的特别的telnet客户端可以通过输入 ctrl + ] ,接着输入 quit ,再按下回车键 来退出,但是你的客户端可能需要不同的步骤。

一旦你退出了telnet客户端,你将可能在IEx会话里看到一个错误:

1
2
3
4
** (MatchError) no match of right hand side value: {:error, :closed}
(kv_server) lib/kv_server.ex:45: KVServer.read_line/1
(kv_server) lib/kv_server.ex:37: KVServer.serve/1
(kv_server) lib/kv_server.ex:30: KVServer.loop_acceptor/1

这是因为我们期望从 :gen_tcp.recv/2 获得数据,但是客户端关闭了链接。在我们服务器将来的修订版里我们需要更好地处理这样的情况。

现在我们有一个更重要的bug需要去修复:如果我们的TCP接收者崩溃会发生什么?因为没有监督,服务器死掉而我们将不能服务更多请求,因为服务器不能被重启。这就是为什么我们必须把我们的服务器移到一颗监督树里的原因。

Tasks

我们已经学习了agent,通用服务器和监督者。它们都可以处理多消息或者管理状态。当我们只是需要执行一些任务的时候我们用哪个而它又是什么?

Task模块提供了这个功能。例如,它有一个 start_link/3 函数,此函数接收一个模块、函数和函数的参数作为入参,让我们运行一个给定的函数作为一个监督树的一部分。

让我们试试它。打开 lib/kv_server/application.ex ,让我们修改 start/2 函数里的监督者,修改如下:

1
2
3
4
5
6
7
8
9
10
def start(_type, _args) do
import Supervisor.Spec

children = [
worker(Task, [KVServer, :accept, [4040]])
]

opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end

经过这个修改,我们说我们想运行 KVServer.accept(4040) 作为一个工作者。现在我们硬编码端口号,不过稍后我们将讨论修改这种方式。

现在服务器是监督树的一部分,当我们启动应用的时候它将自动启动。在终端里输入 mix run –no-halt ,然后再次使用 telnet 客户端来确认所有事情都依然运行:

1
2
3
4
5
6
7
8
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

对的,它运行着!如果你杀掉客户端,整个服务器也崩溃。但是你将看到另一个服务器马上起来。但是,它的扩展性如何?

尝试同时链接两个telnet客户端。当你这么做的时候,你将注意到第二个客户端没有回声消息:

1
2
3
4
5
6
7
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

它好像没有完全正确运行。这是因为我们在接收链接的进程里处理请求。当一个客户端被链接了,我们就不能再接收另一个客户端。

Tasks监督者

为了使得我们的服务器处理同时发生的链接,我们需要有一个作为接收者的进程创建其他进程来处理请求。一个解决方案是将如下所示内容:

1
2
3
4
5
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end

改为用 Task.start_link/1 ,它和 Task.start_link/3 相似,不过它接收一个匿名函数而不是模块、函数和函数的参数:

1
2
3
4
5
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.start_link(fn -> serve(client) end)
loop_acceptor(socket)
end

我们从接收者进程直接启动一个被链接的任务。但是我们已经犯了一次错。你还记得吗?

这个错与当我们从registry直接调用 KV.Bucket.start_link/0 造成的错误相似。这意味着在任何一个bucket的错误将把整个registry搞垮。

上面的代码有同样的瑕疵:如果我们链接 serve(client) 这个任务到接收者,当它处理请求的时候崩溃,则会导致接收者乃至其他所有链接崩溃。

我们通过用 :simple_one_for_one 这种监督者来为registry修复这个问题。我们将使用相同的策略,除了这个模式是如此常见,Task已经有一个解决方案:一个 :simple_one_for_one 监督者启动临时任务作为我们的监督树的一部分。

让我们再次修改 start/2 ,增加一个监督者到我们的监督树:

1
2
3
4
5
6
7
8
9
10
11
def start(_type, _args) do
import Supervisor.Spec

children = [
supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
worker(Task, [KVServer, :accept, [4040]])
]

opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end

我们现在将用 KVServer.TaskSupervisor 这个名字启动一个 Task.Supervisor 进程。记住:因为接收者任务依赖这个监督者,因此这个监督者必须先启动。

现在我们需要修改 loop_acceptor/1 使用 Task.Supervisor 来处理每一个请求:

1
2
3
4
5
6
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end

你可能注意到我们增加了一行,:ok = :gen_tcp.controlling_process(client, pid) 。这使得子进程成为 client 这个套接字进程的“控制进程”。如果我们不这么做,如果接收者崩溃,它将把所有客户端都搞崩溃,因为套接字被绑定到接收它们的进程(这是默认的行为)。

用 mix run –no-halt 启动一个新的服务器,然后我们现在可以打开许多并发的telnet客户端。你也将注意到,退出一个客户端,不会把接收者搞崩溃。非常好!

下面是整个echo服务器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
defmodule KVServer do
require Logger

@doc """
Starts accepting connections on the given `port`.
"""
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false, reuseaddr: true])
Logger.info "Accepting connections on port #{port}"
loop_acceptor(socket)
end

defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end

defp serve(socket) do
socket
|> read_line()
|> write_line(socket)

serve(socket)
end

defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end

defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end

因为我们已经修改了监督者规格说明,我们要问:我们的监督策略还正确吗?

在现在的情况下,答案是:正确。如果接收者崩溃,无需崩溃已经存在的链接。在另一方面,如果任务监督者崩溃,也不需要崩溃接收者。

在下一章,我们将开始分析客户端请求和发送响应,并结束我们的服务器。

原文链接: http://elixir-lang.org/getting-started/mix-otp/task-and-gen-tcp.html