纯粹且简单的事务性内存

本文是一篇纯技术文章。

几个并行的程序如何维护一个状态的一致性视图?我的意思是,两个程序,可能处于不同的国家,如何用一致性方式来操作共同的状态变量?他们如何用一种不需要任何锁的方式来做到这点?

答案是令人吃惊的简单和不可思议的优美,那就是使用叫做事务性内存的方法。

事务性内存是如何运作的?

首先我必须解释为什么并发地更新数据会是一个问题。

想像有一个服务器S有状态变量X和两个客户端C1C2。客户端从服务器获取数据(图1)。现在两个客户端都认为X=20C1X增加20同时C2X增加30。它们修改它们本地的副本(图2),并且将数据写回服务器(图3和图4)。如果这些操作是一个接一个地进行,那么最后服务器上的X的值应该是70而不是50,很明显现在有问题出现了。

解决此问题的常规方法是在独立的事务发生时锁定服务器,但这种方法正如我前面一篇文章中指出的一样,它会有问题。

为了允许这些更新并行地执行而且不锁资源,我们可以使用叫做事务性内存的方法。

事务性内存

一个事务性内存是一个元组(Var,Version,Value)的集合。如上图,X的版本是1而值是20,Y的版本是6而值是true。

版本数字表示这个变量被修改的次数。

现在我们来尝试做一个事务操作。假设我们想修改XY。首先我们给服务器发送消息:{get,X,Y},服务器返回两个变量的值和它们各自的版本数字。

修改变量后,我们向服务器发送消息:{put,(X,1,30),(Y,6,false)}。仅当所有变量的版本数字与服务器中的变量的版本数字匹配,服务器将接收这个消息。然后服务器接受变量的修改并回复消息:yes。如果任何变量的版本数字不匹配,则服务器回复消息:no

很明显,如果第二个进程在第一个进程回答之前更新内存,那么版本号将不一致,更新将失败。

请注意,该算法不锁定数据而且在一个分布式的环境中工作得很好,客户端和服务器处于物理上不同的机器上,这些机器之间的传输延迟是未知的。

这不正是很好而且很老的基于set操作之上归纳出来的test-and-set操作吗?

是的,当然是。如果你想想他们如何用信号量实现互斥的就明白了。信号量用一个原子的test-and-set指令来实现。一个信号量的值只能为0或1。test-and-set操作就是说,如果这个变量的值是0那么就把它的值改为1,这个操作是原子性的。要保留一个关键区域,它由一个标志保护。如果标志为0,那么它可以被保留,如果标志为1,那么它可以被使用。为了避免两个进程同时保留这个关键区域,test-and-set操作必须是原子性的。事务性内存只是概括了这个方法。

现在让我们用Erlang来实现它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
-module(tm).
-export([new/0, addVar/2, getVars/2, putVars/2]).
%% new() -> Pid
%% 创建一个新的事务性内存 (TM)
%%
%% addVar(Pid, Var) -> ok
%% 增加一个变量到事务性内存
%%
%% getVars([V1,...]) -> [{Vsn,Data},....]
%% 在事务性内存里查询变量V1,V2,...
%%
%% putVars([{Var,Vsn,Data}]) -> Bool
%% 修改事务性内存里的变量
%% 下面是一个运行的例子
%%
%% 1> c(tm).
%% {ok,tm}
%% 2> P=tm:new().
%% <0.47.0>
%% 3> tm:addVar(x).
%% ok
%% 4> tm:addVar(P,x).
%% ok
%% 5> tm:addVar(P,y).
%% ok
%% 6> tm:getVars(P, [x,y]).
%% [{ok,{0,void}},{ok,{0,void}}]
%% 7> tm:putVars(P, [{x,0,12},{y,0,true}]).
%% yes
%% 8> tm:putVars(P, [{x,1,25}]).
%% yes
%% 9> tm:getVars(P, [x,y]).
%% [{ok,{2,25}},{ok,{1,true}}]
%% 10> tm:putVars(P, [{x,1,15}]).
%% no
new() -> spawn(fun() -> loop(dict:new()) end).
addVar(Pid, Var) -> rpc(Pid, {create, Var}).
getVars(Pid, Vgs) -> rpc(Pid, {get, Vgs}).
putVars(Pid, New) -> rpc(Pid, {put, New}).
%% 内部函数
%%
%% 远程过程调用(RPC)
rpc(Pid, Q) ->
Pid ! {self(), Q},
receive
{Pid, Reply} -> Reply
end.
loop(Dict) ->
receive
{From, {get, Vars}} ->
Vgs = lists:map(fun(I) ->
dict:find(I, Dict) end, Vars),
From ! {self(), Vgs},
loop(Dict);
{From, {put, Vgs}} ->
case update(Vgs, Dict) of
no ->
From ! {self(), no},
loop(Dict);
{yes, Dict1} ->
From ! {self(), yes},
loop(Dict1)
end;
{From, {create, Var}} ->
From ! {self(), ok},
loop(create_var(Var, Dict))
end.
update([{Var,Generation,Val}|T], D) ->
{G, _} = dict:fetch(Var, D),
case Generation of
G -> update(T, dict:store(Var, {G+1, Val}, D));
_ -> no
end;
update([], D) ->
{yes, D}.
create_var(Var, Dict) ->
case dict:find(Var, Dict) of
{ok, _} -> Dict;
error -> dict:store(Var, {0,void}, Dict)
end.

原文链接: http://armstrongonsoftware.blogspot.com.ar/2006/09/pure-and-simple-transaction-memories.html