在PSPDFKit,我们使用Elixir构建可靠且高性能的分布式系统。 在这样的系统中,我们经常需要调用外部服务,同时异步消息传递允许客户端进行这些调用而不必等待响应。 无法立即处理的消息将排队并稍后处理,但是当队列过载时会发生什么呢? 由于我们不希望系统崩溃,因此我们必须使用反压机制,以防止队列无限增长。 此文介绍了如何使用sbroker库将反压机制应用于Elixir应用程序。
在Elixir应用中使用sbroker的Erlang库
sbroker这个Erlang库提供了用于创建池和(或)负载调节器的构建模块。 它使用broker模式,其中与服务worker的通信由负责worker与其调用之间协调的broker处理。
一个简单例子
让我们看一个如何在Elixir应用程序中使用sbroker库的简单示例。
首先我们在命令行运行如下命令:
|
|
它将在当前目录下创建一个样例工程,名字就叫做 “example” 。在该示例中,我们将模拟在一个worker中对外部服务的调用并由broker处理相关通信。为此,我们编辑example/mix.exs文件,以便将sbroker库添加到我们的应用程序中:
|
|
我们将sbroker库添加到上面的依赖项和应用程序中。我们也在18行指定了我们应用的模块 mod: {Example, []},这个模块我们稍后创建。现在我们准备添加一个broker,所以我们在example/lib/example/broker.ex中创建我们对broker模块:
|
|
上面的模块实现了sbroker行为。在第9行我们启动sbroker,并且在选项里设置超时为10秒。这个超时的意思是,当这些调用在队列里等候worker的时间超过10秒,它们将被丢弃。在init/1函数里,我们给broker定义了客户端和worker的队列。我们定义了broker模块后,我们需要定义worker模块,它负责定义worker,并且向broker请求任务。我们在 example/lib/example/worker.ex里定义worker模块:
|
|
fetchfrom_external_resource/1
函数是一个简单的模拟函数,它将使得进程等待一秒,然后返回 {:ok, “External service called with #{inspect(params)}”} 。当worker这个GenServer进程收到 {tag, {:go, ref, {pid, {:fetch, [params]}},, }} 这样的消息时,这个函数将被调用。这个元组中的tag变量是一个唯一标识符,它被用来标识worker并且被存储在GenServer进程的状态中。
在worker获得所需数据后,它会向broker请求新的任务。我们已经定义了broker和worker模块,因此我们现在可以定义一个监督者,它将启动broker和一个worker池。监督者在 example/lib/example/supervisor.ex 里定义:
|
|
在这个例子中,我们的worker池有5个worker。我们基本完成工作了,还剩下要创建一个application模块,这个模块定义在 example/lib/example.ex 中:
|
|
这个模块启动监督者,并且它有一个函数:fetch_from_external_resource/1,这个函数向broker请求一个worker,当broker能够为我们的请求分配一个worker的时候,会向这个worker发送了消息 {:fetch, [params]} 。当broker不能分配给我们worker的时候,返回的是 {:drop, time} 消息,这样的话,我们的私有函数 perform/1,将返回 {:error, :overload} 。函数 fetch_from_external_resource/1 将打印worker的返回值,或者因为broker丢弃了我们的请求而打印 {:error, :overload} 。
我们现在可以在iex里测试这个例子:
|
|
然后我们可以运行如下语句来从外部资源获取数据:
|
|
一秒后在iex里会输出如下信息:
|
|
为了模拟和测试更多的调用,我们可以通过运行以下语句多次并行地调用 Example.fetch_from_external_resource(“test”) :
|
|
这将打印相同的行并一次打印五行,因为我们的示例worker池包含五个worker。我们也将得到 {:error, :overload} 响应,因为broker不能分配worker并且任务在队列等待太长时间。 {:error, :overload} 响应是用于防止外部服务过载的反压力示例。例如,我们的系统现在可以通过HTTP / 1.1 429 Too Many Requests回复请求服务的客户端,并且它不会因为过载而崩溃。
结论
使用带有worker池和队列的异步进程来扩展系统是一种很好的方式,但是由于我们不希望系统崩溃,我们还应该考虑处理过载的方法。一种方法是将反压机制应用到我们的系统中。在Elixir应用程序中,可以使用sbroker库轻松完成此任务。
原文链接: https://pspdfkit.com/blog/2018/back-pressure-queuing-system-with-sbroker/