10-分布式任务及配置

在这最后一章中,我们将回到:kv应用程序,给它添加一个路由层,使之可以根据桶的名字,在各个节点间分发请求。
路由层会接收一个如下形式的路由表:

  1. [{?a..?m, :"foo@computer-name"},
  2. {?n..?z, :"bar@computer-name"}]

路由者(负责转发请求的角色,可能是个节点)将根据桶名字的第一个字节查这个路由表,
然后根据路由表所示将用户对桶的请求发给相应的节点。比如,根据上表,
某个桶名字第一个字母是“a”(?a表示字母“a”的Unicode码),那么对它的请求会被路由到foo@computer-name这个节点去。

如果节点处理了路由请求,那么路由过程结束。如果节点自己也有个路由表,会根据该路由表把请求又转发给了其它相应节点。
如果最终没有节点接收和处理请求,则报出异常。

你会问,为啥我们不简单地命令路由表中查出来的节点来直接处理数据请求,而是将路由请求传给它?
当一个路由表像上面那个例子一样简单的话,这样做比较简单。但是,考虑到程序的规模越来越大,会将大路由表分解成小块,
分开存放在不同节点。这种情况下,还是用分发路由请求的方式简单些。也许在某些时刻,
节点foo@computer-name将只负责路由请求,不再处理桶数据请求,它承载的桶都会分给其它节点。
这种方式,节点bar@computer-name都不需要知道这个变化。

注意:
本章中我们会在同一台机器上使用两个节点。你当然可以用同一网络中得不同的机器,但是这种方式需要做一些准备。
首先你需要确保所有的机器上都有一个名叫~/.erlang.cookie的文件。
其次,你要保证 epmd
在一个可访问的端口运行(你可以执行epmd -d 查看debug信息来确定这点)。
第三,如果你想学习更多关于分布式编程的知识,我们推荐这篇文章

我们最初版本的分布式代码

Elixir内置了连接节点及于期间交换信息的工具。事实上,在一个分布式的环境中进行的消息的发送和接收,
和之前学习的进程的内容并无区别:因为Elixir的进程是 位置透明 的。 意思是当我们发送消息的时候,
它不管请求是在当前节点还是在别的节点,虚拟机都会传递消息。

为了执行分布式的代码,我们需要用某个名字启动虚拟机
名字可以使简短的(当在同一个网络内)或是较长的(需要附上计算机地址)。让我们启动一个新IEx会话:

  1. $ iex --sname foo

You can see now the prompt is slightly different and shows the node name followed by the computer name:

  1. Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help)
  2. iex(foo@jv)1>

My computer is named jv, so I see foo@jv in the example above, but you will get a different result. We will use jv@computer-name in the following examples and you should update them accordingly when trying out the code.

让我们在shell中定义一个名叫Hello的模块:

  1. iex> defmodule Hello do
  2. ...> def world, do: IO.puts "hello world"
  3. ...> end

If you have another computer on the same network with both Erlang and Elixir installed, you can start another shell on it. If you don’t, you can simply start another IEx session in another terminal. In either case, give it the short name of bar:

  1. $ iex --sname bar

Note that inside this new IEx session, we cannot access Hello.world/0:

  1. iex> Hello.world
  2. ** (UndefinedFunctionError) undefined function: Hello.world/0
  3. Hello.world()

However we can spawn a new process on foo@computer-name from bar@computer-name! Let’s give it a try (where @computer-name is the one you see locally):

  1. iex> Node.spawn_link :"foo@computer-name", fn -> Hello.world end
  2. #PID<9014.59.0>
  3. hello world

Elixir spawned a process on another node and returned its pid. The code then executed on the other node where the Hello.world/0 function exists and invoked that function. Note that the result of “hello world” was printed on the current node bar and not on foo. In other words, the message to be printed was sent back from foo to bar. This happens because the process spawned on the other node (foo) still has the group leader of the current node (bar). We have briefly talked about group leaders in the IO chapter.

We can send and receive message from the pid returned by Node.spawn_link/2 as usual. Let’s try a quick ping-pong example:

  1. iex> pid = Node.spawn_link :"foo@computer-name", fn ->
  2. ...> receive do
  3. ...> {:ping, client} -> send client, :pong
  4. ...> end
  5. ...> end
  6. #PID<9014.59.0>
  7. iex> send pid, {:ping, self}
  8. {:ping, #PID<0.73.0>}
  9. iex> flush
  10. :pong
  11. :ok

From our quick exploration, we could conclude that we should simply use Node.spawn_link/2 to spawn processes on a remote node every time we need to do a distributed computation. However we have learned throughout this guide that spawning processes outside of supervision trees should be avoided if possible, so we need to look for other options.

There are three better alternatives to Node.spawn_link/2 that we could use in our implementation:

  1. We could use Erlang’s :rpc module to execute functions on a remote node. Inside the bar@computer-name shell above, you can call :rpc.call(:"foo@computer-name", Hello, :world, []) and it will print “hello world”

  2. We could have a server running on the other node and send requests to that node via the GenServer API. For example, you can call a remote named server using GenServer.call({name, node}, arg) or simply passing the remote process PID as first argument

  3. We could use tasks, which we have learned about in a previous chapter, as they can be spawned on both local and remote nodes

The options above have different properties. Both :rpc and using a GenServer would serialize your requests on a single server, while tasks are effectively running asynchronously on the remote node, with the only serialization point being the spawning done by the supervisor.

For our routing layer, we are going to use tasks, but feel free to explore the other alternatives too.

async/await

So far we have explored tasks that are started and run in isolation, with no regard for their return value. However, sometimes it is useful to run a task to compute a value and read its result later on. For this, tasks also provide the async/await pattern:

  1. task = Task.async(fn -> compute_something_expensive end)
  2. res = compute_something_else()
  3. res + Task.await(task)

async/await provides a very simple mechanism to compute values concurrently. Not only that, async/await can also be used with the same Task.Supervisor we have used in previous chapters. We just need to call Task.Supervisor.async/2 instead of Task.Supervisor.start_child/2 and use Task.await/2 to read the result later on.

Distributed tasks

Distributed tasks are exactly the same as supervised tasks. The only difference is that we pass the node name when spawning the task on the supervisor. Open up lib/kv/supervisor.ex from the :kv application. Let’s add a task supervisor to the tree:

  1. supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),

Now, let’s start two named nodes again, but inside the :kv application:

  1. $ iex --sname foo -S mix
  2. $ iex --sname bar -S mix

From inside bar@computer-name, we can now spawn a task directly on the other node via the supervisor:

  1. iex> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, fn ->
  2. ...> {:ok, node()}
  3. ...> end
  4. %Task{pid: #PID<12467.88.0>, ref: #Reference<0.0.0.400>}
  5. iex> Task.await(task)
  6. {:ok, :"foo@computer-name"}

Our first distributed task is straightforward: it simply gets the name of the node the task is running on. With this knowledge in hand, let’s finally write the routing code.

路由层

Create a file at lib/kv/router.ex with the following contents:

  1. defmodule KV.Router do
  2. @doc """
  3. Dispatch the given `mod`, `fun`, `args` request
  4. to the appropriate node based on the `bucket`.
  5. """
  6. def route(bucket, mod, fun, args) do
  7. # Get the first byte of the binary
  8. first = :binary.first(bucket)
  9. # Try to find an entry in the table or raise
  10. entry =
  11. Enum.find(table, fn {enum, node} ->
  12. first in enum
  13. end) || no_entry_error(bucket)
  14. # If the entry node is the current node
  15. if elem(entry, 1) == node() do
  16. apply(mod, fun, args)
  17. else
  18. sup = {KV.RouterTasks, elem(entry, 1)}
  19. Task.Supervisor.async(sup, fn ->
  20. KV.Router.route(bucket, mod, fun, args)
  21. end) |> Task.await()
  22. end
  23. end
  24. defp no_entry_error(bucket) do
  25. raise "could not find entry for #{inspect bucket} in table #{inspect table}"
  26. end
  27. @doc """
  28. The routing table.
  29. """
  30. def table do
  31. # Replace computer-name with your local machine name.
  32. [{?a..?m, :"foo@computer-name"},
  33. {?n..?z, :"bar@computer-name"}]
  34. end
  35. end

Let’s write a test to verify our router works. Create a file named test/kv/router_test.exs containing:

  1. defmodule KV.RouterTest do
  2. use ExUnit.Case, async: true
  3. test "route requests across nodes" do
  4. assert KV.Router.route("hello", Kernel, :node, []) ==
  5. :"foo@computer-name"
  6. assert KV.Router.route("world", Kernel, :node, []) ==
  7. :"bar@computer-name"
  8. end
  9. test "raises on unknown entries" do
  10. assert_raise RuntimeError, ~r/could not find entry/, fn ->
  11. KV.Router.route(<<0>>, Kernel, :node, [])
  12. end
  13. end
  14. end

The first test simply invokes Kernel.node/0, which returns the name of the current node, based on the bucket names “hello” and “world”. According to our routing table so far, we should get foo@computer-name and bar@computer-name as responses, respectively.

The second test just checks that the code raises for unknown entries.

In order to run the first test, we need to have two nodes running. Let’s restart the node named bar, which is going to be used by tests. This time we’ll need to run the node in the test environment, to ensure the compiled code being run is exactly the same as that used in the tests themselves:

  1. $ MIX_ENV=test iex --sname bar -S mix

And now run tests with:

  1. $ elixir --sname foo -S mix test

Our test should successfully pass. Excellent!

Test filters and tags

Although our tests pass, our testing structure is getting more complex. In particular, running tests with only mix test causes failures in our suite, since our test requires a connection to another node.

Luckily, ExUnit ships with a facility to tag tests, allowing us to run specific callbacks or even filter tests altogether based on those tags.

All we need to do to tag a test is simply call @tag before the test name. Back to test/kv/router_test.exs, let’s add a :distributed tag:

  1. @tag :distributed
  2. test "route requests across nodes" do

Writing @tag :distributed is equivalent to writing @tag distributed: true.

With the test properly tagged, we can now check if the node is alive on the network and, if not, we can exclude all distributed tests. Open up test/test_helper.exs inside the :kv application and add the following:

  1. exclude =
  2. if Node.alive?, do: [], else: [distributed: true]
  3. ExUnit.start(exclude: exclude)

Now run tests with mix test:

  1. $ mix test
  2. Excluding tags: [distributed: true]
  3. .......
  4. Finished in 0.1 seconds (0.1s on load, 0.01s on tests)
  5. 7 tests, 0 failures

This time all tests passed and ExUnit warned us that distributed tests were being excluded. If you run tests with $ elixir --sname foo -S mix test, one extra test should run and successfully pass as long as the bar@computer-name node is available.

The mix test command also allows us to dynamically include and exclude tags. For example, we can run $ mix test --include distributed to run distributed tests regardless of the value set in test/test_helper.exs. We could also pass --exclude to exclude a particular tag from the command line. Finally, --only can be used to run only tests with a particular tag:

  1. $ elixir --sname foo -S mix test --only distributed

You can read more about filters, tags and the default tags in ExUnit.Case module documentation.

Application environment and configuration

So far we have hardcoded the routing table into the KV.Router module. However, we would like to make the table dynamic. This allows us not only to configure development/test/production, but also to allow different nodes to run with different entries in the routing table. There is a feature of OTP that does exactly that: the application environment.

Each application has an environment that stores the application’s specific configuration by key. For example, we could store the routing table in the :kv application environment, giving it a default value and allowing other applications to change the table as needed.

Open up apps/kv/mix.exs and change the application/0 function to return the following:

  1. def application do
  2. [applications: [],
  3. env: [routing_table: []],
  4. mod: {KV, []}]
  5. end

We have added a new :env key to the application. It returns the application default environment, which has an entry of key :routing_table and value of an empty list. It makes sense for the application environment to ship with an empty table, as the specific routing table depends on the testing/deployment structure.

In order to use the application environment in our code, we just need to replace KV.Router.table/0 with the definition below:

  1. @doc """
  2. The routing table.
  3. """
  4. def table do
  5. Application.get_env(:kv, :routing_table)
  6. end

We use Application.get_env/2 to read the entry for :routing_table in :kv‘s environment. You can find more information and other functions to manipulate the app environment in the Application module.

Since our routing table is now empty, our distributed test should fail. Restart the apps and re-run tests to see the failure:

  1. $ iex --sname bar -S mix
  2. $ elixir --sname foo -S mix test --only distributed

The interesting thing about the application environment is that it can be configured not only for the current application, but for all applications. Such configuration is done by the config/config.exs file. For example, we can configure IEx default prompt to another value. Just open apps/kv/config/config.exs and add the following to the end:

  1. config :iex, default_prompt: ">>>"

Start IEx with iex -S mix and you can see that the IEx prompt has changed.

This means we can configure our :routing_table directly in the config/config.exs file as well:

  1. # Replace computer-name with your local machine nodes.
  2. config :kv, :routing_table,
  3. [{?a..?m, :"foo@computer-name"},
  4. {?n..?z, :"bar@computer-name"}]

Restart the nodes and run distributed tests again. Now they should all pass.

Each application has its own config/config.exs file and they are not shared in any way. Configuration can also be set per environment. Read the contents of the config file for the :kv application for more information on how to do so.

Since config files are not shared, if you run tests from the umbrella root, they will fail because the configuration we just added to :kv is not available there. However, if you open up config/config.exs in the umbrella, it has instructions on how to import config files from children applications. You just need to invoke:

  1. import_config "../apps/kv/config/config.exs"

The mix run command also accepts a --config flag, which allows configuration files to be given on demand. This could be used to start different nodes, each with its own specific configuration (for example, different routing tables).

Overall, the built-in ability to configure applications and the fact that we have built our software as an umbrella application gives us plenty of options when deploying the software. We can:

  • deploy the umbrella application to a node that will work as both TCP server and key-value storage

  • deploy the :kv_server application to work only as a TCP server as long as the routing table points only to other nodes

  • deploy only the :kv application when we want a node to work only as storage (no TCP access)

As we add more applications in the future, we can continue controlling our deploy with the same level of granularity, cherry-picking which applications with which configuration are going to production. We can also consider building multiple releases with a tool like exrm, which will package the chosen applications and configuration, including the current Erlang and Elixir installations, so we can deploy the application even if the runtime is not pre-installed on the target system.

Finally, we have learned some new things in this chapter, and they could be applied to the :kv_server application as well. We are going to leave the next steps as an exercise:

  • change the :kv_server application to read the port from its application environment instead of using the hardcoded value of 4040

  • change and configure the :kv_server application to use the routing functionality instead of dispatching directly to the local KV.Registry. For :kv_server tests, you can make the routing table simply point to the current node itself

总结

这一章我们创建了一个简单的路由,并通过它探索了Elixir以及Erlang虚拟机的分布式特性,学习了如何配置路由表。这是《Elixir高级编程手册(Mix和OTP)》的最后一章。

通过这本手册,我们编写了一个非常简单的分布式键-值存储程序,领略了许多重要概念,如通用服务器、事件管理者、监督者、任务、代理、应用程序等等。不仅如此,我们还为整个程序写了测试代码,熟悉了ExUnit,还学习了如何使用Mix构建工具来完成许许多多的工作。

如果你要找一个生成环境能用的分布式键-值存储,你一定要去看看Riak,它也运行于Erlang VM之上。Riak中,桶是有冗余的,以防止数据丢失。另外,它用相容哈希(consistent hashing)而不是路由机制来匹配桶和节点。因为相容哈希算法可以减少因为桶名冲突而导致的不得不将桶迁移到新节点的开销。