gen_tcp 和 active_once

我想理解和测试gen_tcp模块使用的active_once选项。整个过程我写了一些代码,包括三个监督者,一个gen_fsm和一个gen_server。

这是一篇gen_tcp和active_onece选项的概念证明文章。正如我一直说的,这只是我的学习笔记并且我可能是错的。所以查阅OTP文档和其他一些不错的书籍,可以发现关于这一主题的相关内容。

目标

目标很简单,就是看看active_once选项如何控制数据流。也许这一次这个博客的简单概念证明的规则被打破了。本文的这个例子并不像以前那么简单,但我认为最终它是值得的。

主动和非主动套接字

gen_tcp模块提供两种方式来读取套接字流,主动和非主动套接字。后者是传统的方式。通过使用recv/2和recv/3函数,进程决定什么时候从输入流读取数据。这种方式下,进程负责控制输入数据流。

另一方面,主动套接字允许按Erlang消息方式来接收数据。这种方式的问题是,对端进程可以无节制地发送数据来淹没我们进程的接收数据队列,因为我们的进程没有相应的流控策略。

不过现在有办法解决这个问题。通过设置active_once选项为true,进程按Erlang消息方式从套接字接收一次数据。一旦消息被接收到,套接字又转换为非主动方式,如果缓冲区满了,则阻止发送进程发送数据。进程可以再设置active_once选项来从套接字接收下一个Erlang消息来重复这个过程。

控制进程

控制进程就是当套接字被设置成主动或active_once的时候从套接字流接收到Erlang消息的那个进程。默认情况下,控制进程就是接收套接字的进程。

正如我们将看到的,可以用controlling_process/2函数来改变谁是控制进程。

概念证明的结构

实际上,就如下图这么简单。

main_sup: 主监督者。

listener_sup: 接收链接进程的监督者。

worker_sup: 读取工作者的监督者。

accept_fsm: 接收链接进程。

worker_gen: 套接字读取工作者。


main_sup - 主监督者

它使用permanent和one_on_one 重启策略。它有两个也是监督者的子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-module(main_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
io:format("main_sup:init...~n"),
RestartStrategy = one_for_one,
MaxRestarts = 10,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 2000,
Type = supervisor,
ListenerSup = {'listener_sup',
{'listener_sup', start_link, []},
Restart, Shutdown, Type, ['listener_sup']},
WorkerSup = {'worker_sup',
{'worker_sup', start_link, []},
Restart, Shutdown, Type, ['worker_sup']},
ChildList = [ListenerSup, WorkerSup],
{ok, {SupFlags, ChildList}}.

listener_sup: 接收链接进程的监督者。

它使用permanent和one_on_one 重启策略。它的子进程是accept_fsm进程。三个子进程被启动起来是为了准备大量链接的冲击。监听套接字在init函数里被创建,并且它的active属性设置为false,我们想使用默认的流控方式。

需要注意的是,三个子进程是用一个列表解析来创建的,在这里每一个id由build_label函数创建,结果id的格式是 accept_fsm_n,n是进程的顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-module(listener_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
io:format("listener_sup:init...~n"),
{ok, LSocket} = gen_tcp:listen(2000, [{active, false}]),
NumberOfWorkers = 3,
RestartStrategy = one_for_one,
MaxRestarts = 100,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 2000,
Type = worker,
ChildList = [{build_label("accept_fsm_", C),
{'accept_fsm', start_link, [LSocket]},
Restart, Shutdown, Type, ['accept_fsm']}
|| C <- lists:seq(1, NumberOfWorkers)],
{ok, {SupFlags, ChildList}}.
build_label(Name, C) ->
io_lib:format("~s~w", [Name, C]).

worker_sup: 读取工作者的监督者。

它使用temporary和simple_on_on_one重启策略按需求来启动进程。它的子进程是worker_gen进程。它导出start_child/1函数,通过传递新的接收的套接字给此函数来启动一个worker_gen子进程。另外它设置新的子进程作为套接字的控制进程,这样该子进程就可以接收到Erlang消息流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
-module(worker_sup).
-behaviour(supervisor).
-export([start_link/0, start_child/1]).
-export([init/1]).
start_link() ->
io:format("listener_sup:start_link...~n"),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ASocket) ->
{ok, Pid} = supervisor:start_child(?MODULE, [ASocket]),
gen_tcp:controlling_process(ASocket, Pid),
{ok, Pid}.
init([]) ->
io:format("listener_sup:init...~n"),
RestartStrategy = simple_one_for_one,
MaxRestarts = 100,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = temporary,
Shutdown = 2000,
Type = worker,
AChild = {'worker_gen', {'worker_gen', start_link, []},
Restart, Shutdown, Type, ['worker_gen']},
{ok, {SupFlags, [AChild]}}.

accept_fsm: 接收链接进程。

这个进程被实现为一个FSM(有限状态机),而且只有一个状态,即accept状态。它接收一个侦听套接字并且在其上接收链接。当一个新的链接到来,这个进程设置套接收的套接字为active_once并且传递新的接收套接字给worker_sup请求其创建一个新子进程。然后这个进程再回到accept状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
-module(accept_fsm).
-behaviour(gen_fsm).
-export([start_link/1]).
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([accept/2]).
start_link(LSocket) ->
io:format("accept_fsm:start_link...~n"),
gen_fsm:start_link(?MODULE, [LSocket], []).
init([LSocket]) ->
io:format("accept_fsm:init...~n"),
gen_fsm:send_event(self(), accept),
{ok, accept, LSocket}.
% Events.
accept(accept, LSocket) ->
io:format("accept_fsm:accept...~n"),
{ok, ASocket} = gen_tcp:accept(LSocket),
inet:setopts(ASocket, [{active, once}, {packet, line}]),
worker_sup:start_child(ASocket),
gen_fsm:send_event(self(), accept),
{next_state, accept, LSocket}.
% All events.
handle_event(accept, _StateName, State) ->
{next_state, accept, State}.
handle_sync_event(_Any, _From, _StateName, State) ->
{reply, ok, accept, State}.
% OTP messeges.
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
terminate(_Reason, _StateName, _State) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.

worker_gen: 套接字读取工作者。

本文的概念证明就在这个进程里。它只是打印收到的数据到标准输出上。

为了观察流控的作用,在每次读取数据之间引入了一个延迟。在这种方式下,读取速度将比写的速度慢很多。当写缓冲满的时候会强制写进程停止写数据。

注意:每次读取数据后如何设置active_once选项以及如何设置packet类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
-module(worker_gen).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
start_link(ASocket) ->
io:format("worker_gen:start_link...~n"),
gen_server:start_link(?MODULE, [ASocket], []).
init([ASocket]) ->
io:format("worker_gen:init...~n"),
{ok, ASocket}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({tcp, _S, Data}, ASocket) ->
io:format("~p~n", [Data]),
inet:setopts(ASocket, [{active, once}, {packet, line}]),
timer:sleep(100),
{noreply, ASocket};
handle_info({tcp_closed, _S}, ASocket) ->
io:format("Closed...~n"),
{stop, normal, ASocket};
handle_info(timeout, ASocket) ->
io:format("Closed...~n"),
{stop, normal, ASocket}.
terminate(_Reason, ASocket) ->
io:format("Terminated...~n"),
gen_tcp:close(ASocket),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

代码的运行

让我们看看代码如何运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ erl
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe]
[kernel-poll:false]
Eshell V6.1 (abort with ^G)
1> l(main_sup).
{module,main_sup}
2> main_sup:start_link().
main_sup:init...
listener_sup:init...
accept_fsm:start_link...
accept_fsm:init...
accept_fsm:accept...
accept_fsm:start_link...
accept_fsm:init...
accept_fsm:accept...
accept_fsm:start_link...
accept_fsm:init...
accept_fsm:accept...
listener_sup:start_link...
listener_sup:init...
{ok,<0.35.0>}
3>

我们链接上一个客户端。

1
2
3
4
5
6
7
8
9
...
accept_fsm:accept...
listener_sup:start_link...
listener_sup:init...
{ok,<0.35.0>}
worker_gen:start_link...
worker_gen:init...
accept_fsm:accept...
3>

然后从客户端写一些文本。

1
2
3
4
5
6
7
8
9
accept_fsm:accept...
listener_sup:start_link...
listener_sup:init...
{ok,<0.35.0>}
worker_gen:start_link...
worker_gen:init...
accept_fsm:accept...
"Hello World!!!\r\n"
3>

我们来写一大块数据并统计它的用时。

1
2
3
4
5
6
$ time man ls|nc -c localhost 2000
real 0m39.517s
user 0m0.083s
sys 0m0.019s
$

然后我们再做一次,不过这次我们去掉在每次读取数据之间的延迟。

1
2
3
4
5
6
$ time man ls|nc -c localhost 2000
real 0m0.100s
user 0m0.085s
sys 0m0.020s
$

这两个测试让我们明白写进程是如何停下来等待缓冲区释放空间的。

在压力测试后,我们检查到没有进程泄露。

1
2
3
4
5
6
7
3> supervisor:count_children(main_sup).
[{specs,2},{active,2},{supervisors,2},{workers,0}]
4> supervisor:count_children(listener_sup).
[{specs,3},{active,3},{supervisors,0},{workers,3}]
5> supervisor:count_children(worker_sup).
[{specs,1},{active,0},{supervisors,0},{workers,0}]
6>

我不太清楚这个解决方案有没有什么我不知道的问题。我尝试遵循我读过的Erlang和Elixir相关书籍中关于这个主题里学到模式。我可能有一些错误的地方,所以请阅读那些相关的书籍来获得关于这一主题的准确解释。

有任何更正、评论和建议请联系我,我很乐意听到你的反馈。

本文到此结束。

祝大家玩得开心。

原文链接: http://jmilet.github.io/erlang/2015/02/15/active_once.html