集群分片

依赖

为了使用集群分片(Cluster Sharding),你必须在项目中添加如下依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-cluster-sharding_2.11</artifactId>
  5. <version>2.5.19</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-cluster-sharding_2.11', version: '2.5.19'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19"

示例项目

你可以查看「集群分片」项目,以了解 Akka 集群分片的实际使用情况。

简介

当你需要将 Actor 分布在集群中的多个节点上,并且希望能够使用它们的逻辑标识符与它们进行交互,但不必关心它们在集群中的物理位置时,集群分片(Cluster sharding)非常有用,这也可能随着时间的推移而改变。

例如,它可以是表示域驱动设计(Domain-Driven Design)术语中聚合根(Aggregate Roots)的 Actor。在这里,我们称这些 Actor 为“实体”。这些 Actor 通常具有持久(durable)状态,但此功能不限于具有持久状态的 Actor。

集群切分通常在有许多状态 Actor 共同消耗的资源(例如内存)多于一台机器上所能容纳的资源时使用。如果你只有几个有状态的 Actor,那么在集群单例(Cluster Singleton)节点上运行它们可能更容易。

在这个上下文中,分片意味着具有标识符(称为实体)的 Actor 可以自动分布在集群中的多个节点上。每个实体 Actor 只在一个地方运行,消息可以发送到实体,而不需要发送者知道目标 Actor 的位置。这是通过这个扩展提供的ShardRegion Actor 发送消息来实现的,它知道如何将带有实体 ID 的消息路由到最终目标。

如果启用了该功能,则集群分片将不会在状态为WeaklyUp的成员上活动。

  • 警告:不要将 Cluster Sharding 与 Automatic Downing 一起使用,因为它允许集群分裂为两个单独的集群,从而导致多个分片和实体启动,每个集群中只有一个节点!详见「Downing」。

一个示例

这就是实体 Actor 的样子:

  1. public class Counter extends AbstractPersistentActor {
  2. public enum CounterOp {
  3. INCREMENT, DECREMENT
  4. }
  5. public static class Get {
  6. final public long counterId;
  7. public Get(long counterId) {
  8. this.counterId = counterId;
  9. }
  10. }
  11. public static class EntityEnvelope {
  12. final public long id;
  13. final public Object payload;
  14. public EntityEnvelope(long id, Object payload) {
  15. this.id = id;
  16. this.payload = payload;
  17. }
  18. }
  19. public static class CounterChanged {
  20. final public int delta;
  21. public CounterChanged(int delta) {
  22. this.delta = delta;
  23. }
  24. }
  25. int count = 0;
  26. // getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
  27. @Override
  28. public String persistenceId() {
  29. return "Counter-" + getSelf().path().name();
  30. }
  31. @Override
  32. public void preStart() throws Exception {
  33. super.preStart();
  34. getContext().setReceiveTimeout(Duration.ofSeconds(120));
  35. }
  36. void updateState(CounterChanged event) {
  37. count += event.delta;
  38. }
  39. @Override
  40. public Receive createReceiveRecover() {
  41. return receiveBuilder()
  42. .match(CounterChanged.class, this::updateState)
  43. .build();
  44. }
  45. @Override
  46. public Receive createReceive() {
  47. return receiveBuilder()
  48. .match(Get.class, this::receiveGet)
  49. .matchEquals(CounterOp.INCREMENT, msg -> receiveIncrement())
  50. .matchEquals(CounterOp.DECREMENT, msg -> receiveDecrement())
  51. .matchEquals(ReceiveTimeout.getInstance(), msg -> passivate())
  52. .build();
  53. }
  54. private void receiveGet(Get msg) {
  55. getSender().tell(count, getSelf());
  56. }
  57. private void receiveIncrement() {
  58. persist(new CounterChanged(+1), this::updateState);
  59. }
  60. private void receiveDecrement() {
  61. persist(new CounterChanged(-1), this::updateState);
  62. }
  63. private void passivate() {
  64. getContext().getParent().tell(
  65. new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
  66. }
  67. }

上面的 Actor 使用事件源和AbstractPersistentActor中提供的支持来存储其状态。它不必是持久性 Actor,但是如果节点之间的实体发生故障或迁移,那么它必须能够恢复其状态(如果它是有价值的)。

请注意如何定义persistenceId。Actor 的名称是实体标识符(UTF-8 URL 编码)。你也可以用另一种方式定义它,但它必须是唯一的。

当使用分片扩展时,你首先要使用ClusterSharding.start方法注册支持的实体类型,通常是在集群中每个节点上的系统启动时。ClusterSharding.start为你提供了可以传递的参考。请注意,如果当前群集节点的角色与在ClusterShardingSettings中指定的角色不匹配,ClusterSharding.start将以代理模式启动ShardRegion

  1. import akka.japi.Option;
  2. import akka.cluster.sharding.ClusterSharding;
  3. import akka.cluster.sharding.ClusterShardingSettings;
  4. Option<String> roleOption = Option.none();
  5. ClusterShardingSettings settings = ClusterShardingSettings.create(system);
  6. ActorRef startedCounterRegion =
  7. ClusterSharding.get(system)
  8. .start("Counter", Props.create(Counter.class), settings, messageExtractor);

messageExtractor定义了特定于应用程序的方法,以从传入消息中提取实体标识符和分片标识符。

  1. import akka.cluster.sharding.ShardRegion;
  2. ShardRegion.MessageExtractor messageExtractor =
  3. new ShardRegion.MessageExtractor() {
  4. @Override
  5. public String entityId(Object message) {
  6. if (message instanceof Counter.EntityEnvelope)
  7. return String.valueOf(((Counter.EntityEnvelope) message).id);
  8. else if (message instanceof Counter.Get)
  9. return String.valueOf(((Counter.Get) message).counterId);
  10. else return null;
  11. }
  12. @Override
  13. public Object entityMessage(Object message) {
  14. if (message instanceof Counter.EntityEnvelope)
  15. return ((Counter.EntityEnvelope) message).payload;
  16. else return message;
  17. }
  18. @Override
  19. public String shardId(Object message) {
  20. int numberOfShards = 100;
  21. if (message instanceof Counter.EntityEnvelope) {
  22. long id = ((Counter.EntityEnvelope) message).id;
  23. return String.valueOf(id % numberOfShards);
  24. } else if (message instanceof Counter.Get) {
  25. long id = ((Counter.Get) message).counterId;
  26. return String.valueOf(id % numberOfShards);
  27. } else {
  28. return null;
  29. }
  30. }
  31. };

此示例说明了在消息中定义实体标识符的两种不同方法:

  • Get消息包含标识符本身。
  • EntityEnvelope包含标识符,发送给实体 Actor 的实际消息包装在信封中。

注意这两种消息类型是如何在上面展示的entityIdentityMessage方法中处理的。发送给实体 Actor 的消息是entityMessage返回的,这使得在需要时可以打开信封(unwrap envelopes)。

分片是一起管理的一组实体。分组由上文所示的extractShardId函数定义。对于特定的实体标识符,分片标识符必须始终相同。否则,实体 Actor 可能会同时在多个位置意外启动。

创建一个好的分片算法(sharding algorithm)本身就是一个有趣的挑战。尝试产生一个统一的分布,即在每个分片中有相同数量的实体。根据经验,分片的数量应该比计划的最大集群节点数量大十倍。分片少于节点数量将导致某些节点不会承载任何分片。太多的分片将导致对分片的管理效率降低,例如重新平衡开销,并增加延迟,因为协调器(coordinator)参与每个分片的第一条消息的路由。正在运行的集群中的所有节点上的分片算法必须相同。它可以在停止群集中的所有节点后进行更改。

一个简单的分片算法在大多数情况下都可以很好地工作,它是以分片的实体标识符模数的hashCode的绝对值为基础的。为了方便起见,ShardRegion.HashCodeMessageExtractor提供了这一功能。

向实体发送的消息始终通过本地ShardRegion发送。命名实体类型的ShardRegion Actor 引用由ClusterSharding.start返回,也可以使用ClusterSharding.shardRegion检索。如果ShardRegion不知道其位置的话,它将查找实体的分片位置。它将把消息委托给正确的节点,并根据需要创建实体 Actor,即在传递特定实体的第一条消息时。

  1. ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
  2. counterRegion.tell(new Counter.Get(123), getSelf());
  3. counterRegion.tell(new Counter.EntityEnvelope(123, Counter.CounterOp.INCREMENT), getSelf());
  4. counterRegion.tell(new Counter.Get(123), getSelf());

它是如何工作的?

ShardRegion Actor 在集群中的每个节点或标记有特定角色的节点组上启动。ShardRegion由两个特定于应用程序的函数创建,用于从传入消息中提取实体标识符(entity identifier)和分片标识符(shard identifier)。分片是统一管理的一组实体。对于特定分片中的第一条消息,ShardRegion将从中心协调者ShardCoordinator请求分片的位置。

ShardCoordinator决定哪个ShardRegion将拥有Shard,并通知ShardRegion。区域(region)将确认此请求并将Shard 监督者创建为子 Actor。然后,当Shard Actor 需要时,将创建各个Entities。因此,传入消息通过ShardRegionShard传输到目标Entity

如果shard home是另一个ShardRegion实例,则消息将转发到该ShardRegion实例。当解析分片的位置时,该分片的传入消息将被缓冲,并在分片所在地(home)已知时传递。到已解析分片的后续消息可以立即传递到目标目的地,而不涉及ShardCoordinator

场景

一旦知道Shard的位置,ShardRegions就直接发送消息。下面是进入此状态的场景。在场景中,使用以下符号:

  • SC - ShardCoordinator
  • M# - Message 1, 2, 3, 等
  • SR# - ShardRegion 1, 2 3, 等
  • S# - Shard 1 2 3, 等
  • E# - Entity 1 2 3, 等,实体是指由集群分片管理的 Actor。

场景1:向属于本地 ShardRegion 的未知分片发送消息

  1. 传入消息M1ShardRegion实例SR1
  2. M1映射到分片S1SR1不知道S1,所以它向SC询问S1的位置。
  3. SC回答S1的位置是SR1
  4. R1为实体E1创建子 Actor,并将S1的缓冲消息发送给E1子 Actor。
  5. 到达R1S1的所有传入消息都可以由R1处理,而不需要SC。它根据需要创建实体子级,并将消息转发给它们。

场景2:向属于远程 ShardRegion 的未知分片发送消息

  1. 传入消息M2ShardRegion实例SR1
  2. M2映射到S2SR1不知道S2,所以它向SC询问S2的位置。
  3. SC回答S2的位置是SR2
  4. SR1S2的缓冲消息发送到SR2
  5. 到达SR1S2的所有传入消息都可以由SR1处理,而不需要SC。它将消息转发到SR2
  6. SR2接收到S2的消息,询问SCSC回答S2的位置是SR2,这时我们将回到场景1中(但SR2除外)。

分片位置

为了确保特定实体 Actor 的至多一个实例在集群中的某个地方运行,所有节点都具有相同的分片(shard)所在位置视图是很重要的。因此,分片分配决策由中心ShardCoordinator执行,它作为一个集群单例运行,即在所有集群节点中的最老成员上或标记有特定角色的一组节点上执行一个实例。

决定分片位置的逻辑在可插拔分片分配策略中定义。默认实现ShardCoordinator.LeastShardAllocationStrategy将新的分片分配给ShardRegion,其中以前分配的分片数量最少。此策略可以由特定于应用程序的实现替代。

分片再平衡

为了能够在集群中使用新添加的成员,协调器(coordinator)促进了分片的重新平衡(rebalancing of shards),即将实体从一个节点迁移到另一个节点。在重新平衡过程中,协调器首先通知所有ShardRegion Actor 已开始对分片的切换。这意味着它们将开始缓冲该分片的传入消息,就像分片位置未知一样。在重新平衡过程中,协调器不会回答任何有关正在重新平衡的分片位置的请求,即本地缓冲将继续,直到完成切换。负责重新平衡分片的ShardRegion将通过向该分片中的所有实体发送指定的stopMessage(默认为PoisonPill)来停止该分片中的所有实体。所有实体终止后,拥有实体的ShardRegion将确认已向协调器完成移交。此后,协调器将回复分片位置的请求,从而为分片分配一个新的位置,然后将分片区域 Actor 中的缓冲消息发送到新位置。这意味着实体的状态不会被转移或迁移。如果实体的状态很重要,那么它应该是持久的,例如「Persistence」,以便可以在新的位置恢复。

决定要重新平衡哪些分片的逻辑在可插入分片分配策略(a pluggable shard allocation strategy)中定义。默认实现ShardCoordinator.LeastShardAllocationStrategyShardRegion中选择用于切换的分片,其中包含以前分配的大多数碎片。然后,它们将以最少数量的先前分配的分片(即集群中的新成员)分配给ShardRegion

对于LeastShardAllocationStrategy,有一个可配置的阈值(rebalance-threshold),说明开始重新平衡时差异必须有多大。在分片最多的区域和分片最少的区域中,分片数量的差异必须大于发生重新平衡的rebalance-threshold

rebalance-threshold1时,给出了最佳分布,因此通常是最佳选择。更高的阈值意味着更多的分片可以同时重新平衡,而不是一个接一个。这样做的优点是,重新平衡过程可以更快,但缺点是不同节点之间的分片数量(因此负载)可能会显著不同。

ShardCoordinator 状态

ShardCoordinator中分片位置的状态是持久的,带有「Distributed Data」或「Persistence」,可以在故障中幸存。当从集群中删除崩溃或无法访问的协调节点(通过down)时,新的ShardCoordinator单例 Actor 将接管并恢复状态。在这种故障期间,具有已知位置的分片仍然可用,而新(未知)分片的消息将被缓冲,直到新的ShardCoordinator可用。

消息排序

只要发送者使用同一个ShardRegion Actor 将消息传递给实体 Actor,消息的顺序就会保持不变。只要没有达到缓冲区限制,消息就会以“最多一次传递”的语义尽最大努力传递,与普通消息发送的方式相同。可靠的端到端(end-to-end)消息传递,通过在「Persistence」中使用AtLeastOnceDelivery,可以实现“至少一次传递”的语义。

开销

由于到协调器的往返(round-trip),针对新的或以前未使用的分片的消息引入了一些额外的延迟。重新平衡分片也可能增加延迟。在设计特定于应用程序的分片解决方案时,应该考虑这一点,例如,为了避免太细的分片。一旦知道分片的位置,唯一的开销(overhead)就是通过ShardRegion发送消息,而不是直接发送消息。

分布式数据模式 vs. 持久化模式

协调器的状态和分片「Remembering Entities」的状态是持久的,可以在失败中幸存。「Distributed Data」或「Persistence」可用于存储。默认情况下使用分布式数据(Distributed Data)。

使用两种模式时的功能相同。如果你的分片实体本身不使用 Akka 持久化(Persistence),那么使用分布式数据模式更方便,因为你不必为持久性设置和操作单独的数据存储(如 Cassandra)。除此之外,使用一种模式而不使用另一种模式没有主要原因。

在集群中的所有节点上使用相同的模式很重要,即不可能执行滚动升级来更改此设置。

分布式数据模式

此模式通过配置启用(默认情况下启用):

  1. akka.cluster.sharding.state-store-mode = ddata

ShardCoordinator的状态将在集群内由分布式数据模块复制,具有WriteMajority/ReadMajority一致性。协调器的状态不持久,它没有存储到磁盘。当集群中的所有节点都已停止时,状态将丢失,也不再需要了。

记忆实体(Remembering Entities)的状态也是持久的,即存储在磁盘上。存储的实体也会在群集完全重新启动后启动。

集群分片(Cluster Sharding)使用它自己的每个节点角色的分布式数据Replicator。通过这种方式,可以将所有节点的子集用于某些实体类型,将另一个子集用于其他实体类型。每个这样的复制器(replicator)都有一个包含节点角色的名称,因此集群中所有节点上的角色配置都必须相同,即在执行滚动升级时不能更改角色。

分布式数据的设置在akka.cluster.sharding.distributed-data部分中配置。对于不同的分片实体类型,不可能有不同的distributed-data设置。

持久化模式

此模式通过配置启用:

  1. akka.cluster.sharding.state-store-mode = persistence

因为它是在集群中运行的,所以必须用分布式日志配置持久化。

达到最少成员数后启动

在集群设置akka.cluster.min-nr-of-membersakka.cluster.role.<role-name>.min-nr-of-members时,使用集群分片是很好的。这将推迟分片的分配,直到至少有配置数量的区域已经启动并注册到协调器。这就避免了许多分片被分配到第一个注册的区域,只有在以后才被重新平衡到其他节点。

有关min-nr-of-members的详细信息,请参阅「How To Startup when Cluster Size Reached」。

仅代理模式

ShardRegion Actor 也可以在仅代理模式(proxy only mode)下启动,即它不会承载任何实体本身,但知道如何将消息委托到正确的位置。ShardRegion以仅代理模式使用ClusterSharding.startProxy方法启动。此外,如果当前群集节点的角色与传递给ClusterSharding.start方法的ClusterShardingSettings中指定的角色不匹配时,则ShardRegion将以仅代理模式启动。

Passivation

如果实体的状态是持久的,则可以停止不用于减少内存消耗的实体。这是由实体 Actor 的特定于应用程序的实现完成的,例如通过定义接收超时(context.setReceiveTimeout)。如果某个消息在停止时已排队到该实体,则将删除邮箱中排队的消息。为了在不丢失此类消息的情况下支持优雅的钝化(passivation),实体 Actor 可以将ShardRegion.Passivate发送给其父Shard。在Passivate中指定的包装消息将被发送回实体,然后该实体将自行停止。在接收到Passivate和终止实体之间,传入消息将被Shard缓冲。这样的缓冲消息随后被传递到实体的新化身。

Automatic Passivation

如果实体使用akka.cluster.sharding.passivate-idle-entity-after设置一段时间没有收到消息,或者通过将ClusterShardingSettings.passivateIdleEntityAfter显式设置为一个合适的时间以保持 Actor 活动,则可以将这些实体配置为自动钝化(automatically passivated)。请注意,只有通过分片发送的消息才会被计算在内,因此直接发送到 Actor 的ActorRef的消息或它发送给自身的消息不会被计算为活动。默认情况下,自动钝化是禁止的。

Remembering Entities

通过在调用ClusterSharding.start时将ClusterShardingSettings中的rememberEntities标志设置为true,并确保shardIdExtractor处理Shard.StartEntity(EntityId),可以使每个Shard中的实体列表持久化,这意味着ShardId必须可以从EntityId中提取。

  1. @Override
  2. public String shardId(Object message) {
  3. int numberOfShards = 100;
  4. if (message instanceof Counter.EntityEnvelope) {
  5. long id = ((Counter.EntityEnvelope) message).id;
  6. return String.valueOf(id % numberOfShards);
  7. } else if (message instanceof Counter.Get) {
  8. long id = ((Counter.Get) message).counterId;
  9. return String.valueOf(id % numberOfShards);
  10. } else if (message instanceof ShardRegion.StartEntity) {
  11. long id = Long.valueOf(((ShardRegion.StartEntity) message).entityId());
  12. return String.valueOf(id % numberOfShards);
  13. } else {
  14. return null;
  15. }
  16. }

当配置为记忆实体(remember entities)时,每当Shard重新平衡到另一个节点上或在崩溃后恢复时,它将重新创建以前在该分片中运行的所有实体。要永久停止实体,必须向实体 Actor 的父级发送一条Passivate消息,否则在配置中指定的实体重新启动回退之后,该实体将自动重新启动。

当使用分布式数据模式时,实体的标识符存储在分布式数据的「Durable Storage」中。你可能需要更改akka.cluster.sharding.distributed-data.durable.lmdb.dir的配置,因为默认目录包含 Actor 系统的远程端口。如果使用动态分配的端口(0),则每次都会不同,并且不会加载以前存储的数据。

rememberEntities设置为false时,Shard不会在重新平衡或从崩溃中恢复后自动重新启动任何实体。只有在Shard中收到实体的第一条消息后,才会启动实体。如果实体停止而不使用Passivate,则不会重新启动。

请注意,实体本身的状态将不会被恢复,除非它们已被持久化,例如「Persistence」。

当启动/停止实体以及重新平衡分片时,rememberEntities的性能成本相当高。这种成本随着每个分片的实体数量增加而增加,我们目前不建议在每个分片上使用超过 10000 个实体。

监督

如果需要为实体 Actor 使用其他supervisorStrategy,而不是默认(重新启动)策略,则需要创建一个中间父 Actor,该 Actor 定义子实体 Actor 的supervisorStrategy

  1. static class CounterSupervisor extends AbstractActor {
  2. private final ActorRef counter =
  3. getContext().actorOf(Props.create(Counter.class), "theCounter");
  4. private static final SupervisorStrategy strategy =
  5. new OneForOneStrategy(
  6. DeciderBuilder.match(IllegalArgumentException.class, e -> SupervisorStrategy.resume())
  7. .match(ActorInitializationException.class, e -> SupervisorStrategy.stop())
  8. .match(Exception.class, e -> SupervisorStrategy.restart())
  9. .matchAny(o -> SupervisorStrategy.escalate())
  10. .build());
  11. @Override
  12. public SupervisorStrategy supervisorStrategy() {
  13. return strategy;
  14. }
  15. @Override
  16. public Receive createReceive() {
  17. return receiveBuilder()
  18. .match(Object.class, msg -> counter.forward(msg, getContext()))
  19. .build();
  20. }
  21. }

你以同样的方式启动这样一个监督者(supervisor),就像它是实体 Actor 一样。

  1. ClusterSharding.get(system)
  2. .start(
  3. "SupervisedCounter", Props.create(CounterSupervisor.class), settings, messageExtractor);

请注意,当新消息针对(targeted to)实体时,停止的实体将再次启动。

优雅地关闭

你可以将ShardRegion.gracefulShutdownInstance消息发送给ShardRegion Actor,以分发由该ShardRegion承载的所有分片,然后将停止ShardRegion Actor。你可以监控(watchShardRegion Actor 以便知道什么时候完成。在此期间,其他区域将以协调器触发重新平衡时的相同方式缓冲这些分片的消息。当分片被停止时,协调器将把这些分片分配到其他地方。

这是由「Coordinated Shutdown」自动执行的,因此是集群成员正常退出进程的一部分。

删除内部群集分片数据

集群分片协调器使用 Akka 持久化存储分片的位置。重新启动整个 Akka 集群时,可以安全地删除这些数据。请注意,这不是应用程序数据。

有一个实用程序akka.cluster.sharding.RemoveInternalClusterShardingData,用于删除此数据。

  • 警告:在运行使用群集分片的 Akka 群集节点时,切勿使用此程序。使用此程序前,请停止所有群集节点。

如果由于数据损坏而无法启动群集分片协调器,则可能需要删除数据,如果同时意外运行两个群集,例如由于使用自动关闭而存在网络分裂,则可能会发生这种情况。

  • 警告:不要将集群分片(Cluster Sharding)与自动关闭(Automatic Downing)一起使用,因为它允许集群分裂为两个单独的集群,从而导致多个分片和实体启动。

使用这个程序作为一个独立的 Java 主程序:

  1. java -classpath <jar files, including akka-cluster-sharding>
  2. akka.cluster.sharding.RemoveInternalClusterShardingData
  3. -2.3 entityType1 entityType2 entityType3

该程序包含在akka-cluster-sharding.jar 文件中。使用与普通应用程序相同的类路径和配置运行它是最简单的。它可以以类似的方式从 sbt 或 Maven 运行。

指定实体类型名称(与在ClusterShardingstart方法中使用的名称相同)作为程序参数。

如果将-2.3指定为第一个程序参数,它还将尝试使用不同的persistenceId删除在Akka 2.3.x中由集群分片(Cluster Sharding)存储的数据。

配置

可以使用以下属性配置ClusterSharding扩展。当使用ActorSystem参数创建时,ClusterShardingSettings将读取这些配置属性。还可以修改ClusterShardingSettings或从另一个配置部分创建它,布局如下。ClusterShardingSettingsClusterSharding扩展的start方法的参数,也就是说,如果需要,每个实体类型都可以配置不同的设置。

  1. # Settings for the ClusterShardingExtension
  2. akka.cluster.sharding {
  3. # The extension creates a top level actor with this name in top level system scope,
  4. # e.g. '/system/sharding'
  5. guardian-name = sharding
  6. # Specifies that entities runs on cluster nodes with a specific role.
  7. # If the role is not specified (or empty) all nodes in the cluster are used.
  8. role = ""
  9. # When this is set to 'on' the active entity actors will automatically be restarted
  10. # upon Shard restart. i.e. if the Shard is started on a different ShardRegion
  11. # due to rebalance or crash.
  12. remember-entities = off
  13. # Set this to a time duration to have sharding passivate entities when they have not
  14. # gotten any message in this long time. Set to 'off' to disable.
  15. passivate-idle-entity-after = off
  16. # If the coordinator can't store state changes it will be stopped
  17. # and started again after this duration, with an exponential back-off
  18. # of up to 5 times this duration.
  19. coordinator-failure-backoff = 5 s
  20. # The ShardRegion retries registration and shard location requests to the
  21. # ShardCoordinator with this interval if it does not reply.
  22. retry-interval = 2 s
  23. # Maximum number of messages that are buffered by a ShardRegion actor.
  24. buffer-size = 100000
  25. # Timeout of the shard rebalancing process.
  26. # Additionally, if an entity doesn't handle the stopMessage
  27. # after (handoff-timeout - 5.seconds).max(1.second) it will be stopped forcefully
  28. handoff-timeout = 60 s
  29. # Time given to a region to acknowledge it's hosting a shard.
  30. shard-start-timeout = 10 s
  31. # If the shard is remembering entities and can't store state changes
  32. # will be stopped and then started again after this duration. Any messages
  33. # sent to an affected entity may be lost in this process.
  34. shard-failure-backoff = 10 s
  35. # If the shard is remembering entities and an entity stops itself without
  36. # using passivate. The entity will be restarted after this duration or when
  37. # the next message for it is received, which ever occurs first.
  38. entity-restart-backoff = 10 s
  39. # Rebalance check is performed periodically with this interval.
  40. rebalance-interval = 10 s
  41. # Absolute path to the journal plugin configuration entity that is to be
  42. # used for the internal persistence of ClusterSharding. If not defined
  43. # the default journal plugin is used. Note that this is not related to
  44. # persistence used by the entity actors.
  45. # Only used when state-store-mode=persistence
  46. journal-plugin-id = ""
  47. # Absolute path to the snapshot plugin configuration entity that is to be
  48. # used for the internal persistence of ClusterSharding. If not defined
  49. # the default snapshot plugin is used. Note that this is not related to
  50. # persistence used by the entity actors.
  51. # Only used when state-store-mode=persistence
  52. snapshot-plugin-id = ""
  53. # Defines how the coordinator stores its state. Same is also used by the
  54. # shards for rememberEntities.
  55. # Valid values are "ddata" or "persistence".
  56. state-store-mode = "ddata"
  57. # The shard saves persistent snapshots after this number of persistent
  58. # events. Snapshots are used to reduce recovery times.
  59. # Only used when state-store-mode=persistence
  60. snapshot-after = 1000
  61. # The shard deletes persistent events (messages and snapshots) after doing snapshot
  62. # keeping this number of old persistent batches.
  63. # Batch is of size `snapshot-after`.
  64. # When set to 0 after snapshot is successfully done all messages with equal or lower sequence number will be deleted.
  65. # Default value of 2 leaves last maximum 2*`snapshot-after` messages and 3 snapshots (2 old ones + fresh snapshot)
  66. keep-nr-of-batches = 2
  67. # Setting for the default shard allocation strategy
  68. least-shard-allocation-strategy {
  69. # Threshold of how large the difference between most and least number of
  70. # allocated shards must be to begin the rebalancing.
  71. # The difference between number of shards in the region with most shards and
  72. # the region with least shards must be greater than (>) the `rebalanceThreshold`
  73. # for the rebalance to occur.
  74. # 1 gives the best distribution and therefore typically the best choice.
  75. # Increasing the threshold can result in quicker rebalance but has the
  76. # drawback of increased difference between number of shards (and therefore load)
  77. # on different nodes before rebalance will occur.
  78. rebalance-threshold = 1
  79. # The number of ongoing rebalancing processes is limited to this number.
  80. max-simultaneous-rebalance = 3
  81. }
  82. # Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
  83. # Only used when state-store-mode=ddata
  84. waiting-for-state-timeout = 5 s
  85. # Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
  86. # Only used when state-store-mode=ddata
  87. updating-state-timeout = 5 s
  88. # The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used
  89. # by the persistent shard when rebalancing or restarting. The value can either be "all" or "constant". The "all"
  90. # strategy start all the underlying entity actors at the same time. The constant strategy will start the underlying
  91. # entity actors at a fix rate. The default strategy "all".
  92. entity-recovery-strategy = "all"
  93. # Default settings for the constant rate entity recovery strategy
  94. entity-recovery-constant-rate-strategy {
  95. # Sets the frequency at which a batch of entity actors is started.
  96. frequency = 100 ms
  97. # Sets the number of entity actors to be restart at a particular interval
  98. number-of-entities = 5
  99. }
  100. # Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
  101. # The "role" of the singleton configuration is not used. The singleton role will
  102. # be the same as "akka.cluster.sharding.role".
  103. coordinator-singleton = ${akka.cluster.singleton}
  104. # Settings for the Distributed Data replicator.
  105. # Same layout as akka.cluster.distributed-data.
  106. # The "role" of the distributed-data configuration is not used. The distributed-data
  107. # role will be the same as "akka.cluster.sharding.role".
  108. # Note that there is one Replicator per role and it's not possible
  109. # to have different distributed-data settings for different sharding entity types.
  110. # Only used when state-store-mode=ddata
  111. distributed-data = ${akka.cluster.distributed-data}
  112. distributed-data {
  113. # minCap parameter to MajorityWrite and MajorityRead consistency level.
  114. majority-min-cap = 5
  115. durable.keys = ["shard-*"]
  116. # When using many entities with "remember entities" the Gossip message
  117. # can become to large if including to many in same message. Limit to
  118. # the same number as the number of ORSet per shard.
  119. max-delta-elements = 5
  120. }
  121. # The id of the dispatcher to use for ClusterSharding actors.
  122. # If not specified default dispatcher is used.
  123. # If specified you need to define the settings of the actual dispatcher.
  124. # This dispatcher for the entity actors is defined by the user provided
  125. # Props, i.e. this dispatcher is not used for the entity actors.
  126. use-dispatcher = ""
  127. }

自定义分片分配策略(shard allocation strategy)可以在ClusterSharding.start的可选参数中定义。有关如何实现自定义分片分配策略的详细信息,请参阅AbstractShardAllocationStrategy的 API 文档。

检查群集分片状态

有两个检查群集状态的请求可用:

  • ShardRegion.getShardRegionStateInstance,它将返回一个ShardRegion.ShardRegionState,其中包含区域中运行的分片的标识符以及每个分片的活动实体。
  • ShardRegion.GetClusterShardingStats,它将查询集群中的所有区域,并返回一个ShardRegion.ClusterShardingStats,其中包含每个区域中运行的分片的标识符以及每个分片中活动的实体数。

可以通过ClusterSharding.getShardTypeNames获取所有已启动分片的类型名。

这些消息的目的是测试(testing)和监控(monitoring),它们不提供直接向各个实体发送消息的访问权。

滚动升级

在进行滚动升级(rolling upgrades)时,必须特别注意不要改变以下任何分片方面:

  • extractShardId函数
  • 分片区域运行的角色
  • 持久化模式

如果其中任何一个需要更改,则需要完全重新启动群集。


英文原文链接Cluster Sharding.