协作

  • 警告:此模块当前标记为「可能更改」。它已准备好用于生产,但 API 可能在没有警告或预测期的情况下发生变化。

Akka 协作(Coordination)是一套用于分布式协作的工具。

Discovery曾是 Akka 管理的一部分,但从 Akka 的2.5.19版和 Akka 管理的1.0.0版开始,它已成为 Akka 的模块。如果你还将 Akka 管理用于其他服务发现方法或加载程序,请确保你至少使用了 Akka 管理(Management)的1.0.0版本。

依赖

为了使用 Akka 协作,你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-coordination_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-coordination_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-coordination" % "2.5.23"

Lease

Lease是分布式锁的可插拔 API。

使用 Lease

Lease通过以下方式加载:

  • Lease名称
  • 配置位置以指示应加载哪个实现
  • 所有者名称

任何Lease实现应提供以下保证:

  • 同一名称的Lease多次加载,即使在不同的节点上,也是同一Lease
  • 一次只能有一个所有者获得Lease

获取Lease

  1. Lease lease =
  2. LeaseProvider.get(system).getLease("<name of the lease>", "docs-lease", "<owner name>");
  3. CompletionStage<Boolean> acquired = lease.acquire();
  4. boolean stillAcquired = lease.checkLease();
  5. CompletionStage<Boolean> released = lease.release();

获取Lease会返回一个CompletionStage,因为Lease实现通常是通过第三方系统(如 Kubernetes API 服务器或 ZooKeeper)实现的。

一旦获得Lease,就可以调用checkLease以确保Lease仍然被获得。由于Lease实现基于其他分布式系统,因此Lease可能会因第三方系统超时而丢失。此操作不是异步的,因此可以在执行任何具有Lease的操作之前调用它。

Lease有一个所有者。如果同一所有者多次尝试获取Lease,那么它将成功,即Lease是可重入的。

选择一个对你的用例来说是唯一的Lease名称是很重要的。如果集群中每个节点的Lease都需要唯一,那么可以使用群集主机端口:

  1. String owner = Cluster.get(system).selfAddress().hostPort();

对于在同一节点上有多个不同Lease的用例,必须在名称中添加一些唯一的东西。例如,Lease可以与集群分片(Cluster Sharding)一起使用,在这种情况下,分片 ID 包含在每个分片的Lease名称中。

其他 Akka 模块的用途

Lease可以用于「集群单例」和「集群分片」。

Lease 实现

实现一个 Lease

实现应该扩展akka.coordination.lease.javadsl.Lease

  1. static class SampleLease extends Lease {
  2. private LeaseSettings settings;
  3. public SampleLease(LeaseSettings settings) {
  4. this.settings = settings;
  5. }
  6. @Override
  7. public LeaseSettings getSettings() {
  8. return settings;
  9. }
  10. @Override
  11. public CompletionStage<Boolean> acquire() {
  12. return null;
  13. }
  14. @Override
  15. public CompletionStage<Boolean> acquire(Consumer<Optional<Throwable>> leaseLostCallback) {
  16. return null;
  17. }
  18. @Override
  19. public CompletionStage<Boolean> release() {
  20. return null;
  21. }
  22. @Override
  23. public boolean checkLease() {
  24. return false;
  25. }
  26. }

这些方法应提供以下保证:

  • acquire:如果成功获取Lease,则为true;如果Lease被其他所有者占用,则为false;如果无法与实现Lease的第三方系统通信,则失败。
  • release:如果Lease已明确释放,则为true;如果Lease未明确释放,则为false;如果不知道Lease是否已释放,则失败。
  • acquireCompletionStage完成之前,checkLease应返回false;如果Lease由于与第三方通信错误而丢失,也应该返回false。检查Lease也不应阻塞。
  • Lease lost callback只能在acquireCompletionStage完成后调用,如果Lease丢失(例如,由于与第三方系统的通信丢失),则应调用该回调。

此外,期望Lease的实现包括生存时间机制(a time to live mechanism),这意味着在节点崩溃的情况下不会永远保留Lease。如果用户希望在这种情况下进行外部干预以获得最大的安全性,那么生存时间可以设置为无限。

配置必须为Lease实现的 FQCN 定义lease-class属性。

如果默认值来自akka.coordination.lease,则Lease实现应支持以下属性:

  1. # if the node that acquired the leases crashes, how long should the lease be held before another owner can get it
  2. heartbeat-timeout = 120s
  3. # interval for communicating with the third party to confirm the lease is still held
  4. heartbeat-interval = 12s
  5. # lease implementations are expected to time out acquire and release calls or document
  6. # that they do not implement an operation timeout
  7. lease-operation-timeout = 5s

此配置位置将传递到getLease

  1. akka.actor.provider = cluster
  2. docs-lease {
  3. lease-class = "docs.akka.coordination.SampleLease"
  4. heartbeat-timeout = 100s
  5. heartbeat-interval = 1s
  6. lease-operation-timeout = 1s
  7. # Any lease specific configuration
  8. }

英文原文链接Coordination.