分布式数据

依赖

为了使用分布式数据(Distributed Data),你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-distributed-data_2.12</artifactId>
  5. <version>2.5.22</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-distributed-data_2.12', version: '2.5.22'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-distributed-data" % "2.5.22"

示例项目

你可以下载「Distributed Data」示例项目来看看分布式数据是如何在实践中应用的。

简介

当需要在 Akka 集群中的节点之间共享数据时,Akka 分布式数据非常有用。通过提供类似 API 的键值存储的 Actor 访问数据。键是具有数据值类型信息的唯一标识符。这些值是无冲突的复制数据类型(Conflict Free Replicated Data Types (CRDTs))。

所有数据条目都通过直接复制和基于gossip的协议传播到集群中的所有节点或具有特定角色的节点。你可以对读写的一致性级别进行细粒度控制。

自然CRDTs可以在不协调的情况下从任何节点执行更新。来自不同节点的并发更新将由单调合并函数(monotonic merge function)自动解决,所有数据类型都必须提供该函数。状态变化总是收敛的。为计数器、集合、映射和寄存器提供了几种有用的数据类型,你还可以实现自己的自定义数据类型。

它最终是一致的,旨在提供高读写可用性(分区容限),低延迟。请注意,在最终一致的系统中,读取可能会返回过期的值。

使用 Replicator

akka.cluster.ddata.Replicator Actor 提供了与数据交互的 API。Replicator Actor 必须在集群中的每个节点上启动,或者在标记有特定角色的节点组上启动。它与运行在其他节点上的具有相同路径(而不是地址)的其他Replicator实例通信。为了方便起见,它可以与akka.cluster.ddata.DistributedData扩展一起使用,但也可以使用Replicator.props作为普通 Actor 启动。如果它是作为一个普通的 Actor 启动的,那么它必须在所有节点上以相同的名称、相同的路径启动。

状态为「WeaklyUp」的集群成员将参与分布式数据。这意味着数据将通过后台gossip协议复制到WeaklyUp节点。请注意,如果一致性模式是从所有节点或大多数节点读/写,则它不会参与任何操作。WeaklyUp节点不算作集群的一部分。因此,就一致操作而言,3 个节点 + 5 个WeaklyUp的节点本质上是 3 个节点。

下面是一个 Actor 的示例,它将tick消息调度到自己,并为每个tick添加或删除ORSetobserved-remove set)中的元素。它还订阅了这一变化。

  1. import java.time.Duration;
  2. import java.util.concurrent.ThreadLocalRandom;
  3. import akka.actor.AbstractActor;
  4. import akka.actor.ActorRef;
  5. import akka.actor.Cancellable;
  6. import akka.cluster.Cluster;
  7. import akka.cluster.ddata.DistributedData;
  8. import akka.cluster.ddata.Key;
  9. import akka.cluster.ddata.ORSet;
  10. import akka.cluster.ddata.ORSetKey;
  11. import akka.cluster.ddata.Replicator;
  12. import akka.cluster.ddata.Replicator.Changed;
  13. import akka.cluster.ddata.Replicator.Subscribe;
  14. import akka.cluster.ddata.Replicator.Update;
  15. import akka.cluster.ddata.Replicator.UpdateResponse;
  16. import akka.event.Logging;
  17. import akka.event.LoggingAdapter;
  18. public class DataBot extends AbstractActor {
  19. private static final String TICK = "tick";
  20. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  21. private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  22. private final Cluster node = Cluster.get(getContext().getSystem());
  23. private final Cancellable tickTask =
  24. getContext()
  25. .getSystem()
  26. .scheduler()
  27. .schedule(
  28. Duration.ofSeconds(5),
  29. Duration.ofSeconds(5),
  30. getSelf(),
  31. TICK,
  32. getContext().getDispatcher(),
  33. getSelf());
  34. private final Key<ORSet<String>> dataKey = ORSetKey.create("key");
  35. @SuppressWarnings("unchecked")
  36. @Override
  37. public Receive createReceive() {
  38. return receiveBuilder()
  39. .match(String.class, a -> a.equals(TICK), a -> receiveTick())
  40. .match(
  41. Changed.class,
  42. c -> c.key().equals(dataKey),
  43. c -> receiveChanged((Changed<ORSet<String>>) c))
  44. .match(UpdateResponse.class, r -> receiveUpdateResponse())
  45. .build();
  46. }
  47. private void receiveTick() {
  48. String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123));
  49. if (ThreadLocalRandom.current().nextBoolean()) {
  50. // add
  51. log.info("Adding: {}", s);
  52. Update<ORSet<String>> update =
  53. new Update<>(dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.add(node, s));
  54. replicator.tell(update, getSelf());
  55. } else {
  56. // remove
  57. log.info("Removing: {}", s);
  58. Update<ORSet<String>> update =
  59. new Update<>(
  60. dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.remove(node, s));
  61. replicator.tell(update, getSelf());
  62. }
  63. }
  64. private void receiveChanged(Changed<ORSet<String>> c) {
  65. ORSet<String> data = c.dataValue();
  66. log.info("Current elements: {}", data.getElements());
  67. }
  68. private void receiveUpdateResponse() {
  69. // ignore
  70. }
  71. @Override
  72. public void preStart() {
  73. Subscribe<ORSet<String>> subscribe = new Subscribe<>(dataKey, getSelf());
  74. replicator.tell(subscribe, ActorRef.noSender());
  75. }
  76. @Override
  77. public void postStop() {
  78. tickTask.cancel();
  79. }
  80. }

更新

若要修改和复制数据值,请向本地Replicator发送一条Replicator.Update消息。

Updatekey的当前数据值作为参数传递给Updatemodify函数。函数应该返回数据的新值,然后根据给定的一致性级别复制该值。

modify函数由Replicator Actor 调用,因此必须是一个纯函数,只使用封闭范围中的数据参数和稳定字段。例如,它必须不访问封闭 Actor 的发送方(getSender())引用。

由于modify函数通常不可序列化,因此只能从与Replicator运行在同一本地ActorSystem中的 Actor 发送Update

你提供的写入一致性级别具有以下含义:

  • writeLocal,该值将立即只被写入本地副本,然后通过gossip进行传播。
  • WriteTo(n),该值将立即写入至少n个副本,包括本地副本
  • WriteMajority,该值将立即写入大多数副本,即至少N/2 + 1个副本,其中N是群集(或群集角色组)中的节点数
  • WriteAll,该值将立即写入群集中的所有节点(或群集中角色组中的所有节点)。

当你指定在x个节点中写入n个节点时,更新将首先复制到n个节点。如果在超时的1/5之后没有足够的Acks,更新将复制到其他n个节点。如果剩余节点少于n个,则使用所有剩余节点。可访问节点比不可访问节点更受欢迎。

请注意,WriteMajority有一个minCap参数,该参数对于指定小集群以实现更好的安全性非常有用。

  1. class DemonstrateUpdate extends AbstractActor {
  2. final SelfUniqueAddress node =
  3. DistributedData.get(getContext().getSystem()).selfUniqueAddress();
  4. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  5. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  6. final Key<GSet<String>> set1Key = GSetKey.create("set1");
  7. final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  8. final Key<Flag> activeFlagKey = FlagKey.create("active");
  9. @Override
  10. public Receive createReceive() {
  11. ReceiveBuilder b = receiveBuilder();
  12. b.matchEquals(
  13. "demonstrate update",
  14. msg -> {
  15. replicator.tell(
  16. new Replicator.Update<PNCounter>(
  17. counter1Key,
  18. PNCounter.create(),
  19. Replicator.writeLocal(),
  20. curr -> curr.increment(node, 1)),
  21. getSelf());
  22. final WriteConsistency writeTo3 = new WriteTo(3, Duration.ofSeconds(1));
  23. replicator.tell(
  24. new Replicator.Update<GSet<String>>(
  25. set1Key, GSet.create(), writeTo3, curr -> curr.add("hello")),
  26. getSelf());
  27. final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
  28. replicator.tell(
  29. new Replicator.Update<ORSet<String>>(
  30. set2Key, ORSet.create(), writeMajority, curr -> curr.add(node, "hello")),
  31. getSelf());
  32. final WriteConsistency writeAll = new WriteAll(Duration.ofSeconds(5));
  33. replicator.tell(
  34. new Replicator.Update<Flag>(
  35. activeFlagKey, Flag.create(), writeAll, curr -> curr.switchOn()),
  36. getSelf());
  37. });
  38. return b.build();
  39. }
  40. }

作为Update的答复,如果在提供的超时内根据提供的一致性级别成功复制了值,则会向Update的发送方发送Replicator.UpdateSuccess。否则将返回Replicator.UpdateFailure子类。请注意,Replicator.UpdateTimeout回复并不意味着更新完全失败或已回滚。它可能仍然被复制到一些节点上,并最终通过gossip协议复制到所有节点上。

  1. b.match(
  2. UpdateSuccess.class,
  3. a -> a.key().equals(counter1Key),
  4. a -> {
  5. // ok
  6. });
  1. b.match(
  2. UpdateSuccess.class,
  3. a -> a.key().equals(set1Key),
  4. a -> {
  5. // ok
  6. })
  7. .match(
  8. UpdateTimeout.class,
  9. a -> a.key().equals(set1Key),
  10. a -> {
  11. // write to 3 nodes failed within 1.second
  12. });

你总会看到自己的写入。例如,如果发送两条Update消息更改同一个key的值,则第二条消息的modify函数将看到第一条Update消息执行的更改。

Update消息中,你可以传递一个可选的请求上下文,Replicator不关心该上下文,但它包含在回复消息中。这是一种传递上下文信息(例如原始发送者)的方便方法,无需使用ask或维护本地相关数据结构。

  1. class DemonstrateUpdateWithRequestContext extends AbstractActor {
  2. final Cluster node = Cluster.get(getContext().getSystem());
  3. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  4. final WriteConsistency writeTwo = new WriteTo(2, Duration.ofSeconds(3));
  5. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  6. @Override
  7. public Receive createReceive() {
  8. return receiveBuilder()
  9. .match(
  10. String.class,
  11. a -> a.equals("increment"),
  12. a -> {
  13. // incoming command to increase the counter
  14. Optional<Object> reqContext = Optional.of(getSender());
  15. Replicator.Update<PNCounter> upd =
  16. new Replicator.Update<PNCounter>(
  17. counter1Key,
  18. PNCounter.create(),
  19. writeTwo,
  20. reqContext,
  21. curr -> curr.increment(node, 1));
  22. replicator.tell(upd, getSelf());
  23. })
  24. .match(
  25. UpdateSuccess.class,
  26. a -> a.key().equals(counter1Key),
  27. a -> {
  28. ActorRef replyTo = (ActorRef) a.getRequest().get();
  29. replyTo.tell("ack", getSelf());
  30. })
  31. .match(
  32. UpdateTimeout.class,
  33. a -> a.key().equals(counter1Key),
  34. a -> {
  35. ActorRef replyTo = (ActorRef) a.getRequest().get();
  36. replyTo.tell("nack", getSelf());
  37. })
  38. .build();
  39. }
  40. }

获取

要检索数据的当前值,请向Replicator发生Replicator.Get消息。你提供的一致性级别具有以下含义:

  • readLocal,该值将只从本地副本中读取
  • ReadFrom(n),该值将从n个副本(包括本地副本)中读取和合并
  • ReadMajority,该值将从大多数副本(即至少N/2 + 1个副本)中读取和合并,其中N是集群(或集群角色组)中的节点数
  • ReadAll,该值将从群集中的所有节点(或群集角色组中的所有节点)中读取和合并。

请注意,ReadMajority有一个minCap参数,该参数对于指定小集群以获得更好的安全性非常有用。

  1. class DemonstrateGet extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  4. final Key<GSet<String>> set1Key = GSetKey.create("set1");
  5. final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  6. final Key<Flag> activeFlagKey = FlagKey.create("active");
  7. @Override
  8. public Receive createReceive() {
  9. ReceiveBuilder b = receiveBuilder();
  10. b.matchEquals(
  11. "demonstrate get",
  12. msg -> {
  13. replicator.tell(
  14. new Replicator.Get<PNCounter>(counter1Key, Replicator.readLocal()), getSelf());
  15. final ReadConsistency readFrom3 = new ReadFrom(3, Duration.ofSeconds(1));
  16. replicator.tell(new Replicator.Get<GSet<String>>(set1Key, readFrom3), getSelf());
  17. final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(5));
  18. replicator.tell(new Replicator.Get<ORSet<String>>(set2Key, readMajority), getSelf());
  19. final ReadConsistency readAll = new ReadAll(Duration.ofSeconds(5));
  20. replicator.tell(new Replicator.Get<Flag>(activeFlagKey, readAll), getSelf());
  21. });
  22. return b.build();
  23. }
  24. }

作为Get的回复,如果在提供的超时内根据提供的一致性级别成功检索到值,则会向Get的发送方发送Replicator.GetSuccess。否则将发送Replicator.GetFailure。如果key不存在,则将发送Replicator.NotFound消息。

  1. b.match(
  2. GetSuccess.class,
  3. a -> a.key().equals(counter1Key),
  4. a -> {
  5. GetSuccess<PNCounter> g = a;
  6. BigInteger value = g.dataValue().getValue();
  7. })
  8. .match(
  9. NotFound.class,
  10. a -> a.key().equals(counter1Key),
  11. a -> {
  12. // key counter1 does not exist
  13. });
  1. b.match(
  2. GetSuccess.class,
  3. a -> a.key().equals(set1Key),
  4. a -> {
  5. GetSuccess<GSet<String>> g = a;
  6. Set<String> value = g.dataValue().getElements();
  7. })
  8. .match(
  9. GetFailure.class,
  10. a -> a.key().equals(set1Key),
  11. a -> {
  12. // read from 3 nodes failed within 1.second
  13. })
  14. .match(
  15. NotFound.class,
  16. a -> a.key().equals(set1Key),
  17. a -> {
  18. // key set1 does not exist
  19. });

你总是会读取自己写入的东西。例如,如果发送一条Update消息,后跟一个具有相同keyGet,则Get将检索由前面的Update消息执行的更改。但是,没有定义回复消息的顺序,即在上一个示例中,你可能会在UpdateSuccess之前收到GetSuccess

Get消息中,你可以通过与上述Update消息相同的方式传递可选的请求上下文。例如,在接收和转换``GetSuccess之后,可以传递和回复原始发送者。

  1. class DemonstrateGetWithRequestContext extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final ReadConsistency readTwo = new ReadFrom(2, Duration.ofSeconds(3));
  4. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  5. @Override
  6. public Receive createReceive() {
  7. return receiveBuilder()
  8. .match(
  9. String.class,
  10. a -> a.equals("get-count"),
  11. a -> {
  12. // incoming request to retrieve current value of the counter
  13. Optional<Object> reqContext = Optional.of(getSender());
  14. replicator.tell(new Replicator.Get<PNCounter>(counter1Key, readTwo), getSelf());
  15. })
  16. .match(
  17. GetSuccess.class,
  18. a -> a.key().equals(counter1Key),
  19. a -> {
  20. ActorRef replyTo = (ActorRef) a.getRequest().get();
  21. GetSuccess<PNCounter> g = a;
  22. long value = g.dataValue().getValue().longValue();
  23. replyTo.tell(value, getSelf());
  24. })
  25. .match(
  26. GetFailure.class,
  27. a -> a.key().equals(counter1Key),
  28. a -> {
  29. ActorRef replyTo = (ActorRef) a.getRequest().get();
  30. replyTo.tell(-1L, getSelf());
  31. })
  32. .match(
  33. NotFound.class,
  34. a -> a.key().equals(counter1Key),
  35. a -> {
  36. ActorRef replyTo = (ActorRef) a.getRequest().get();
  37. replyTo.tell(0L, getSelf());
  38. })
  39. .build();
  40. }
  41. }

一致性

UpdateGet中提供的一致性级别指定每个请求必须成功响应写入和读取请求的副本数。

对于低延迟读取,使用readLocal会有检索过时数据的风险,也就是说,来自其他节点的更新可能还不可见。

使用readLocal时,更新仅写入本地副本,然后使用gossip协议在后台传播,传播到所有节点可能需要几秒钟。

当使用readLocal时,你将永远不会收到GetFailure响应,因为本地副本始终对本地读卡器可用。但是,如果modify函数引发异常,或者如果使用持久存储失败,则WriteLocal仍可以使用UpdateFailure消息进行答复。

WriteAllReadAll是最强的一致性级别,但也是最慢的,可用性最低。例如,一个节点对于Get请求不可用,你将不会收到该值。

如果一致性很重要,可以使用以下公式确保读取始终反映最新的写入:

  1. (nodes_written + nodes_read) > N

其中N是集群中节点的总数,或者具有Replicator所用角色的节点的数量。

例如,在 7 节点集群中,这些一致性属性是通过写入 4 个节点和读取 4 个节点,或写入 5 个节点和读取 3 个节点来实现的。

通过将WriteMajorityReadMajority级别结合起来,读始终反映最新的写入。Replicator对大多数复本进行写入和读取,即N / 2 + 1。例如,在 5 节点集群中,它写入 3 个节点并读取 3 个节点。在 6 节点集群中,它写入 4 个节点并读取 4 个节点。

你可以为WriteMajorityReadMajority定义最小数量的节点,这将最小化读取过时数据的风险。最小capWriteMajorityReadMajorityminCap属性提供,并定义所需的多数。如果minCap高于N / 2 + 1,将使用minCap

例如,如果minCap为 5,3 个节点的集群的WriteMajorityReadMajority将为 3,6 个节点的集群将为 5,12 个节点的集群将为7 ( N / 2 + 1 )

对于小集群(<7),在WriteMajorityReadMajority之间成员状态更改的风险相当高,无法保证将多数写入和读取结合在一起的良好属性。因此,WriteMajorityReadMajority有一个minCap参数,该参数对于指定小集群以实现更好的安全性非常有用。这意味着,如果集群大小小于大多数大小,它将使用minCap节点数,但最多使用集群的总大小。

下面是使用WriteMajorityReadMajority的示例:

  1. private final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(3));
  2. private static final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(3));
  1. private Receive matchGetCart() {
  2. return receiveBuilder()
  3. .matchEquals(GET_CART, s -> receiveGetCart())
  4. .match(
  5. GetSuccess.class,
  6. this::isResponseToGetCart,
  7. g -> receiveGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
  8. .match(
  9. NotFound.class,
  10. this::isResponseToGetCart,
  11. n -> receiveNotFound((NotFound<LWWMap<String, LineItem>>) n))
  12. .match(
  13. GetFailure.class,
  14. this::isResponseToGetCart,
  15. f -> receiveGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
  16. .build();
  17. }
  18. private void receiveGetCart() {
  19. Optional<Object> ctx = Optional.of(getSender());
  20. replicator.tell(
  21. new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx), getSelf());
  22. }
  23. private boolean isResponseToGetCart(GetResponse<?> response) {
  24. return response.key().equals(dataKey)
  25. && (response.getRequest().orElse(null) instanceof ActorRef);
  26. }
  27. private void receiveGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
  28. Set<LineItem> items = new HashSet<>(g.dataValue().getEntries().values());
  29. ActorRef replyTo = (ActorRef) g.getRequest().get();
  30. replyTo.tell(new Cart(items), getSelf());
  31. }
  32. private void receiveNotFound(NotFound<LWWMap<String, LineItem>> n) {
  33. ActorRef replyTo = (ActorRef) n.getRequest().get();
  34. replyTo.tell(new Cart(new HashSet<>()), getSelf());
  35. }
  36. private void receiveGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
  37. // ReadMajority failure, try again with local read
  38. Optional<Object> ctx = Optional.of(getSender());
  39. replicator.tell(
  40. new Replicator.Get<LWWMap<String, LineItem>>(dataKey, Replicator.readLocal(), ctx),
  41. getSelf());
  42. }
  1. private Receive matchAddItem() {
  2. return receiveBuilder().match(AddItem.class, this::receiveAddItem).build();
  3. }
  4. private void receiveAddItem(AddItem add) {
  5. Update<LWWMap<String, LineItem>> update =
  6. new Update<>(dataKey, LWWMap.create(), writeMajority, cart -> updateCart(cart, add.item));
  7. replicator.tell(update, getSelf());
  8. }

在少数情况下,执行Update时,需要首先尝试从其他节点获取最新数据。这可以通过首先发送带有ReadMajorityGet,然后在收到GetSuccessGetFailureNotFound回复时继续Update来完成。当你需要根据最新信息做出决定,或者从ORSetORMap中删除条目时,可能需要这样做。如果一个条目从一个节点添加到ORSetORMap,并从另一个节点删除,则只有在执行删除的节点上看到添加的条目时,才会删除该条目(因此名称为已删除集)。

以下示例说明了如何执行此操作:

  1. private void receiveRemoveItem(RemoveItem rm) {
  2. // Try to fetch latest from a majority of nodes first, since ORMap
  3. // remove must have seen the item to be able to remove it.
  4. Optional<Object> ctx = Optional.of(rm);
  5. replicator.tell(
  6. new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx), getSelf());
  7. }
  8. private void receiveRemoveItemGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
  9. RemoveItem rm = (RemoveItem) g.getRequest().get();
  10. removeItem(rm.productId);
  11. }
  12. private void receiveRemoveItemGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
  13. // ReadMajority failed, fall back to best effort local value
  14. RemoveItem rm = (RemoveItem) f.getRequest().get();
  15. removeItem(rm.productId);
  16. }
  17. private void removeItem(String productId) {
  18. Update<LWWMap<String, LineItem>> update =
  19. new Update<>(dataKey, LWWMap.create(), writeMajority, cart -> cart.remove(node, productId));
  20. replicator.tell(update, getSelf());
  21. }
  22. private boolean isResponseToRemoveItem(GetResponse<?> response) {
  23. return response.key().equals(dataKey)
  24. && (response.getRequest().orElse(null) instanceof RemoveItem);
  25. }
  • 警告:即使你使用了WriteMajorityReadMajority,但是如果集群成员在UpdateGet之间发生了更改,则也有读取过时数据的小风险。例如,在 5 个节点的集群中,当你Update并将更改写入 3 个节点时:n1n2n3。然后再添加 2 个节点,从 4 个节点读取一个Get请求,正好是n4n5n6n7,也就是说,在Get请求的响应中看不到n1n2n3上的值。

订阅

你也可以通过向Replicator发送Replicator.Subscribe消息来订阅感兴趣的通知。它将在更新订阅键的数据时向注册订阅者发送Replicator.Changed消息。将使用配置的notify-subscribers-interval定期通知订阅者,还可以向Replicator发送显式Replicator.FlushChange消息以立即通知订阅者。

如果订阅者被终止,则会自动删除订阅者。订阅者也可以使用Replicator.Unsubscribe取消订阅消息。

  1. class DemonstrateSubscribe extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  4. BigInteger currentValue = BigInteger.valueOf(0);
  5. @Override
  6. public Receive createReceive() {
  7. return receiveBuilder()
  8. .match(
  9. Changed.class,
  10. a -> a.key().equals(counter1Key),
  11. a -> {
  12. Changed<PNCounter> g = a;
  13. currentValue = g.dataValue().getValue();
  14. })
  15. .match(
  16. String.class,
  17. a -> a.equals("get-count"),
  18. a -> {
  19. // incoming request to retrieve current value of the counter
  20. getSender().tell(currentValue, getSender());
  21. })
  22. .build();
  23. }
  24. @Override
  25. public void preStart() {
  26. // subscribe to changes of the Counter1Key value
  27. replicator.tell(new Subscribe<PNCounter>(counter1Key, getSelf()), ActorRef.noSender());
  28. }
  29. }

删除

可以通过发送Replicator.Delete消息到本地Replicator来请求删除某个数据条目。如果在提供的超时内根据提供的一致性级别成功删除了值,则作为Delete的回复,会向Delete的发送者发送Replicator.DeleteSuccess。否则将发送Replicator.ReplicationDeleteFailure。请注意,ReplicationDeleteFailure并不意味着删除完全失败或已回滚。它可能仍然被复制到某些节点,并最终被复制到所有节点。

已删除的键不能再次使用,但仍建议删除未使用的数据条目,因为这样可以减少新节点加入群集时的复制开销。随后的DeleteUpdateGet请求将用Replicator.DataDeleted回复。订阅者将收到Replicator.Deleted

Delete消息中,你可以通过与上述Update消息相同的方式传递可选请求上下文。例如,在接收和转换DeleteSuccess之后,可以传递和回复原始发件人。

  1. class DemonstrateDelete extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  4. final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  5. @Override
  6. public Receive createReceive() {
  7. return receiveBuilder()
  8. .matchEquals(
  9. "demonstrate delete",
  10. msg -> {
  11. replicator.tell(
  12. new Delete<PNCounter>(counter1Key, Replicator.writeLocal()), getSelf());
  13. final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
  14. replicator.tell(new Delete<PNCounter>(counter1Key, writeMajority), getSelf());
  15. })
  16. .build();
  17. }
  18. }
  • 警告:由于删除的键继续包含在每个节点的存储数据以及gossip消息中,一系列顶级实体的连续更新和删除将导致内存使用量增加,直到ActorSystem耗尽内存。要在需要频繁添加和删除的地方使用 Akka 分布式数据,你应该使用固定数量的支持更新和删除的顶级数据类型,例如ORMapORSet

delta-CRDT

支持「Delta State Replicated Data Types」。delta-CRDT是一种减少发送更新的完整状态需求的方法。例如,将元素'c''d'添加到集合{'a', 'b'}将导致发送{'c', 'd'},并将其与接收端的状态合并,从而导致集合{'a', 'b', 'c', 'd'}

如果数据类型标记为RequiresCausalDeliveryOfDeltas,则复制deltas的协议支持因果一致性(causal consistency)。否则,它只是最终一致的(eventually consistent)。如果不存在因果一致性,则意味着如果在两个单独的Update操作中添加元素'c''d',这些增量可能偶尔以不同的顺序传播到节点,从而达到更新的因果顺序。在这个例子中,它可以导致集合 {'a', 'b', 'd'}在元素'c'出现之前就可以看到。最终将是{'a', 'b', 'c', 'd'}

请注意,delta-CRDTs有时也会复制完整状态,例如,将新节点添加到集群时,或者由于网络分离或类似问题而无法传播增量时。

可以使用配置属性禁用delta传播:

  1. akka.cluster.distributed-data.delta-crdt.enabled=off

数据类型

数据类型必须是收敛状态的CRDTs并实现AbstractReplicatedData接口,即它们提供了一个单调的合并函数,并且状态更改始终是收敛的。

你可以使用自己的自定义AbstractReplicatedDataAbstractDeltaReplicatedData类型,并且此包提供以下几种类型:

  • Counters:GCounterPNCounter
  • Sets:GSetORSet
  • Maps:ORMapORMultiMapLWWMapPNCounterMap
  • Registers:LWWRegisterFlag

Counters

GCounter是一个“仅增长计数器”。它只支持递增,不支持递减。

它的工作方式与矢量时钟类似。它跟踪每个节点的一个计数器,总值是这些计数器的总和。合并是通过获取每个节点的最大计数来实现的。

如果需要递增和递减,可以使用PNCounter(正/负计数器)。

它跟踪的增量(P)与减量(N)分开。PN表示为两个内部GCounter。合并是通过合并内部PN计数器来处理的。计数器的值是P计数器的值减去N计数器的值。

  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final PNCounter c0 = PNCounter.create();
  3. final PNCounter c1 = c0.increment(node, 1);
  4. final PNCounter c2 = c1.increment(node, 7);
  5. final PNCounter c3 = c2.decrement(node, 2);
  6. System.out.println(c3.value()); // 6

GCounterPNCounter支持delta-CRDT,不需要传递delta

可以在具有PNCounterMap数据类型的映射中管理几个相关计数器。当计数器被放置在PNCounterMap中,而不是将它们作为单独的顶级值放置时,它们被保证作为一个单元一起复制,这有时是相关数据所必需的。

  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final PNCounterMap<String> m0 = PNCounterMap.create();
  3. final PNCounterMap<String> m1 = m0.increment(node, "a", 7);
  4. final PNCounterMap<String> m2 = m1.decrement(node, "a", 2);
  5. final PNCounterMap<String> m3 = m2.increment(node, "b", 1);
  6. System.out.println(m3.get("a")); // 5
  7. System.out.println(m3.getEntries());

Maps

如果只需要向集合中添加元素而不删除元素,则GSet(仅增长集合)是要使用的数据类型。元素可以是任何类型的可以序列化的值。合并是两个集合的交集。

  1. final GSet<String> s0 = GSet.create();
  2. final GSet<String> s1 = s0.add("a");
  3. final GSet<String> s2 = s1.add("b").add("c");
  4. if (s2.contains("a")) System.out.println(s2.getElements()); // a, b, c

GSet支持delta-CRDT,不需要传递delta

如果需要添加和删除操作,应使用ORSetobserved-remove set)。元素可以添加和删除任意次数。如果一个元素同时添加和删除,则添加将成功。不能删除未看到的元素。

ORSet有一个版本向量,当元素添加到集合中时,该向量将递增。添加元素的节点的版本也会针对所谓的“出生点”中的每个元素进行跟踪。合并函数使用版本向量和点来跟踪操作的因果关系并解决并发更新问题。

  1. final Cluster node = Cluster.get(system);
  2. final ORSet<String> s0 = ORSet.create();
  3. final ORSet<String> s1 = s0.add(node, "a");
  4. final ORSet<String> s2 = s1.add(node, "b");
  5. final ORSet<String> s3 = s2.remove(node, "a");
  6. System.out.println(s3.getElements()); // b

ORSet支持delta-CRDT,不需要传递delta

Maps

ORMapobserved-remove map)是一个具有Any类型的键的映射,值本身就是复制的数据类型。它支持为一个映射条目添加、更新和删除任意次数。

如果同时添加和删除一个条目,则添加将成功。无法删除未看到的条目。这与ORSet的语义相同。

如果一个条目同时更新为不同的值,那么这些值将被合并,因此需要复制这些值的数据类型。

直接使用ORMap相当不方便,因为它不公开特定类型的值。ORMap是用来构建更具体Map的低级工具,例如下面特殊的Map

  • ORMultiMapobserved-remove multi-map)是一个多映射实现,它用一个ORSet来包装一个ORMap以获得该映射的值。
  • PNCounterMappositive negative counter map)是命名计数器的映射(其中名称可以是任何类型)。它是具有PNCounter值的特殊ORMap
  • LWWMaplast writer wins map)是一个具有LWWRegisterlast writer wins register)值的特殊ORMap

ORMapORMultiMapPNCounterMapLWWMap支持delta-CRDT,它们需要传递delta。这里对delta的支持意味着作为所有这些映射的基础键类型的ORSet使用delta传播来传递更新。实际上,Map的更新是一个pair,由ORSetdelta组成键和保存在Map中的相应值(ORSetPNCounterLWWRegister)。

  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final ORMultiMap<String, Integer> m0 = ORMultiMap.create();
  3. final ORMultiMap<String, Integer> m1 = m0.put(node, "a", new HashSet<>(Arrays.asList(1, 2, 3)));
  4. final ORMultiMap<String, Integer> m2 = m1.addBinding(node, "a", 4);
  5. final ORMultiMap<String, Integer> m3 = m2.removeBinding(node, "a", 2);
  6. final ORMultiMap<String, Integer> m4 = m3.addBinding(node, "b", 1);
  7. System.out.println(m4.getEntries());

更改数据项时,该项的完整状态将复制到其他节点,即更新映射时,将复制整个映射。因此,不使用一个包含 1000 个元素的ORMap,而是更有效地将其分解为 10 个顶级ORMap条目,每个条目包含 100 个元素。顶级条目是单独复制的,这就需要权衡不同条目可能不会同时复制,并且你可能会看到相关条目之间的不一致。单独的顶级条目不能原子地一起更新。

有一个特殊版本的ORMultiMap,是使用单独的构造函数ORMultiMap.emptyWithValueDeltas[A, B]创建的,它还将更新作为delta传播到它的值(ORSet类型)。这意味着用ORMultiMap.emptyWithValueDeltas启动的ORMultiMap将其更新作为成对传播,包含键的delta和值的delta。它在网络带宽消耗方面效率更高。

但是,此行为尚未成为ORMultiMap的默认行为,如果希望在代码中使用它,则需要将ORMultiMap.empty[A, B](或者ORMultiMap())的调用替换为ORMultiMap.emptyWithValueDeltas[A, B],其中AB分别是映射中的键和值的类型。

请注意,尽管具有相同的Scala类型,但ORMultiMap.emptyWithValueDeltasORMultiMap不兼容,因为复制机制不同。我们需要格外小心,不要将两者混合,因为它们具有相同的类型,所以编译器不会提示错误。尽管如此,ORMultiMap.emptyWithValueDeltas使用与ORMultiMap相同的ORMultiMapKey类型进行引用。

请注意,LWWRegisterLWWMap依赖于同步的时钟,并且仅当值的选择对于在时钟偏差内发生的并发更新不重要时才应使用。请阅读下面有关LWWRegister的部分。

Flags 和 Registers

Flag是布尔值的数据类型,该值初始化为false,可以切换为true。之后就不能改变了。在合并中,true胜于false

  1. final Flag f0 = Flag.create();
  2. final Flag f1 = f0.switchOn();
  3. System.out.println(f1.enabled());

LWWRegisterlast writer wins register)可以保存任何(可序列化)值。

合并LWWRegister将获得时间戳最高的register。请注意,这依赖于同步时钟。只有当值的选择对于时钟偏差内发生的并发更新不重要时,才应使用LWWRegister

如果时间戳完全相同,则合并接受由地址最低的节点(按UniqueAddress排序)更新的register

  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final LWWRegister<String> r1 = LWWRegister.create(node, "Hello");
  3. final LWWRegister<String> r2 = r1.withValue(node, "Hi");
  4. System.out.println(r1.value() + " by " + r1.updatedBy() + " at " + r1.timestamp());

不使用基于System.currentTimeMillis()的时间戳,而是使用基于其他内容的时间戳值,例如,用于乐观并发控制的数据库记录的不断增加的版本号。

  1. class Record {
  2. public final int version;
  3. public final String name;
  4. public final String address;
  5. public Record(int version, String name, String address) {
  6. this.version = version;
  7. this.name = name;
  8. this.address = address;
  9. }
  10. }
  11. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  12. final LWWRegister.Clock<Record> recordClock =
  13. new LWWRegister.Clock<Record>() {
  14. @Override
  15. public long apply(long currentTimestamp, Record value) {
  16. return value.version;
  17. }
  18. };
  19. final Record record1 = new Record(1, "Alice", "Union Square");
  20. final LWWRegister<Record> r1 = LWWRegister.create(node, record1);
  21. final Record record2 = new Record(2, "Alice", "Madison Square");
  22. final LWWRegister<Record> r2 = LWWRegister.create(node, record2);
  23. final LWWRegister<Record> r3 = r1.merge(r2);
  24. System.out.println(r3.value());

对于first-write-wins语义,可以使用LWWRegister#reverseClock而不是LWWRegister#defaultClock

defaultClock使用System.currentTimeMillis()currentTimestamp + 1的最大值。这意味着时间戳对于在相同毫秒内发生的同一节点上的更改会增加。它还意味着,当只有一个活动的writer(如集群单例)时,可以安全地使用不带同步时钟的LWWRegister。这样的单个writer应该首先使用ReadMajority(或更多)读取当前值,然后再使用WriteMajority(或更多)更改和写入值。

自定义数据类型

你可以实现自己的数据类型。唯一的要求是它实现AbstractReplicatedData特性的mergeData函数。

有状态的CRDTs的一个好特性是,它们通常组成得很好,也就是说,你可以组合几个较小的数据类型来构建更丰富的数据结构。例如,PNCounter由两个内部GCounter实例组成,分别跟踪递增和递减。

下面是一个定制的TwoPhaseSet的简单实现,它使用两种内部GSet类型来跟踪添加和删除。TwoPhaseSet可以添加和删除一个元素,但此后再也不能添加。

  1. public class TwoPhaseSet extends AbstractReplicatedData<TwoPhaseSet> {
  2. public final GSet<String> adds;
  3. public final GSet<String> removals;
  4. public TwoPhaseSet(GSet<String> adds, GSet<String> removals) {
  5. this.adds = adds;
  6. this.removals = removals;
  7. }
  8. public static TwoPhaseSet create() {
  9. return new TwoPhaseSet(GSet.create(), GSet.create());
  10. }
  11. public TwoPhaseSet add(String element) {
  12. return new TwoPhaseSet(adds.add(element), removals);
  13. }
  14. public TwoPhaseSet remove(String element) {
  15. return new TwoPhaseSet(adds, removals.add(element));
  16. }
  17. public Set<String> getElements() {
  18. Set<String> result = new HashSet<>(adds.getElements());
  19. result.removeAll(removals.getElements());
  20. return result;
  21. }
  22. @Override
  23. public TwoPhaseSet mergeData(TwoPhaseSet that) {
  24. return new TwoPhaseSet(this.adds.merge(that.adds), this.removals.merge(that.removals));
  25. }
  26. }

数据类型应该是不可变的,即“修改”方法应该返回一个新实例。

如果AbstractDeltaReplicatedData支持delta-CRDT,则实现它的其他方法。

序列化

数据类型必须可以使用「Akka Serializer」进行序列化。强烈建议对自定义数据类型使用Protobuf或类似工具实现有效的序列化。内置数据类型用ReplicatedDataSerialization标记,并用akka.cluster.ddata.protobuf.ReplicatedDataSerializer序列化。

数据类型的序列化用于远程消息,也用于创建消息摘要(SHA-1)以检测更改。因此,有效地进行序列化并为相同的内容生成相同的字节非常重要。例如,集合和映射应该在序列化中确定地排序。

这是上述TwoPhaseSetprotobuf表现:

  1. option java_package = "docs.ddata.protobuf.msg";
  2. option optimize_for = SPEED;
  3. message TwoPhaseSet {
  4. repeated string adds = 1;
  5. repeated string removals = 2;
  6. }

TwoPhaseSet的序列化程序:

  1. import jdocs.ddata.TwoPhaseSet;
  2. import docs.ddata.protobuf.msg.TwoPhaseSetMessages;
  3. import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.Builder;
  4. import java.util.ArrayList;
  5. import java.util.Collections;
  6. import akka.actor.ExtendedActorSystem;
  7. import akka.cluster.ddata.GSet;
  8. import akka.cluster.ddata.protobuf.AbstractSerializationSupport;
  9. public class TwoPhaseSetSerializer extends AbstractSerializationSupport {
  10. private final ExtendedActorSystem system;
  11. public TwoPhaseSetSerializer(ExtendedActorSystem system) {
  12. this.system = system;
  13. }
  14. @Override
  15. public ExtendedActorSystem system() {
  16. return this.system;
  17. }
  18. @Override
  19. public boolean includeManifest() {
  20. return false;
  21. }
  22. @Override
  23. public int identifier() {
  24. return 99998;
  25. }
  26. @Override
  27. public byte[] toBinary(Object obj) {
  28. if (obj instanceof TwoPhaseSet) {
  29. return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray();
  30. } else {
  31. throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass());
  32. }
  33. }
  34. @Override
  35. public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
  36. return twoPhaseSetFromBinary(bytes);
  37. }
  38. protected TwoPhaseSetMessages.TwoPhaseSet twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) {
  39. Builder b = TwoPhaseSetMessages.TwoPhaseSet.newBuilder();
  40. ArrayList<String> adds = new ArrayList<>(twoPhaseSet.adds.getElements());
  41. if (!adds.isEmpty()) {
  42. Collections.sort(adds);
  43. b.addAllAdds(adds);
  44. }
  45. ArrayList<String> removals = new ArrayList<>(twoPhaseSet.removals.getElements());
  46. if (!removals.isEmpty()) {
  47. Collections.sort(removals);
  48. b.addAllRemovals(removals);
  49. }
  50. return b.build();
  51. }
  52. protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
  53. try {
  54. TwoPhaseSetMessages.TwoPhaseSet msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes);
  55. GSet<String> adds = GSet.create();
  56. for (String elem : msg.getAddsList()) {
  57. adds = adds.add(elem);
  58. }
  59. GSet<String> removals = GSet.create();
  60. for (String elem : msg.getRemovalsList()) {
  61. removals = removals.add(elem);
  62. }
  63. // GSet will accumulate deltas when adding elements,
  64. // but those are not of interest in the result of the deserialization
  65. return new TwoPhaseSet(adds.resetDelta(), removals.resetDelta());
  66. } catch (Exception e) {
  67. throw new RuntimeException(e.getMessage(), e);
  68. }
  69. }
  70. }

请注意,集合中的元素是经过排序的,因此对于相同的元素,SHA-1摘要是相同的。

在配置中注册序列化程序:

  1. akka.actor {
  2. serializers {
  3. twophaseset = "jdocs.ddata.protobuf.TwoPhaseSetSerializer"
  4. }
  5. serialization-bindings {
  6. "jdocs.ddata.TwoPhaseSet" = twophaseset
  7. }
  8. }

使用压缩有时是减少数据大小的一个好主意。Gzip压缩由akka.cluster.ddata.protobuf.AbstractSerializationSupport接口提供:

  1. @Override
  2. public byte[] toBinary(Object obj) {
  3. if (obj instanceof TwoPhaseSet) {
  4. return compress(twoPhaseSetToProto((TwoPhaseSet) obj));
  5. } else {
  6. throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass());
  7. }
  8. }
  9. @Override
  10. public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
  11. return twoPhaseSetFromBinary(decompress(bytes));
  12. }

如上所示,两个嵌入的GSet可以序列化,但一般来说,从现有的内置类型组合新的数据类型时,最好对这些类型使用现有的序列化程序。这可以通过在protobuf中将这些字段声明为字节字段来实现:

  1. message TwoPhaseSet2 {
  2. optional bytes adds = 1;
  3. optional bytes removals = 2;
  4. }

并使用序列化支持特性提供的方法otherMessageToProtootherMessageFromBinary来序列化和反序列化GSet实例。这适用于任何具有已注册的 Akka 序列化程序的类型。下面就是TwoPhaseSet这样的序列化程序:

  1. import jdocs.ddata.TwoPhaseSet;
  2. import docs.ddata.protobuf.msg.TwoPhaseSetMessages;
  3. import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.Builder;
  4. import akka.actor.ExtendedActorSystem;
  5. import akka.cluster.ddata.GSet;
  6. import akka.cluster.ddata.protobuf.AbstractSerializationSupport;
  7. import akka.cluster.ddata.protobuf.ReplicatedDataSerializer;
  8. public class TwoPhaseSetSerializer2 extends AbstractSerializationSupport {
  9. private final ExtendedActorSystem system;
  10. private final ReplicatedDataSerializer replicatedDataSerializer;
  11. public TwoPhaseSetSerializer2(ExtendedActorSystem system) {
  12. this.system = system;
  13. this.replicatedDataSerializer = new ReplicatedDataSerializer(system);
  14. }
  15. @Override
  16. public ExtendedActorSystem system() {
  17. return this.system;
  18. }
  19. @Override
  20. public boolean includeManifest() {
  21. return false;
  22. }
  23. @Override
  24. public int identifier() {
  25. return 99998;
  26. }
  27. @Override
  28. public byte[] toBinary(Object obj) {
  29. if (obj instanceof TwoPhaseSet) {
  30. return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray();
  31. } else {
  32. throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass());
  33. }
  34. }
  35. @Override
  36. public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
  37. return twoPhaseSetFromBinary(bytes);
  38. }
  39. protected TwoPhaseSetMessages.TwoPhaseSet2 twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) {
  40. Builder b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder();
  41. if (!twoPhaseSet.adds.isEmpty())
  42. b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString());
  43. if (!twoPhaseSet.removals.isEmpty())
  44. b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString());
  45. return b.build();
  46. }
  47. @SuppressWarnings("unchecked")
  48. protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
  49. try {
  50. TwoPhaseSetMessages.TwoPhaseSet2 msg = TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes);
  51. GSet<String> adds = GSet.create();
  52. if (msg.hasAdds()) adds = (GSet<String>) otherMessageFromBinary(msg.getAdds().toByteArray());
  53. GSet<String> removals = GSet.create();
  54. if (msg.hasRemovals())
  55. adds = (GSet<String>) otherMessageFromBinary(msg.getRemovals().toByteArray());
  56. return new TwoPhaseSet(adds, removals);
  57. } catch (Exception e) {
  58. throw new RuntimeException(e.getMessage(), e);
  59. }
  60. }
  61. }

持久存储

默认情况下,数据只保存在内存中。它是冗余的,因为它被复制到集群中的其他节点,但是如果停止所有节点,数据就会丢失,除非你已将其保存到其他位置。

条目可以配置为持久的,即存储在每个节点的本地磁盘上。下一次启动replicator时,即当 Actor 系统重新启动时,将加载存储的数据。这意味着只要旧集群中的至少一个节点参与到新集群中,数据就可以生存。持久条目的键配置为:

  1. akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]

在键的末尾使用*支持前缀匹配。

通过指定以下内容,可以使所有条目持久:

  1. akka.cluster.distributed-data.durable.keys = ["*"]

LMDB」是默认的存储实现。通过实现akka.cluster.ddata.DurableStore中描述的 Actor 协议并为新实现定义akka.cluster.distributed-data.durable.store-actor-class属性,可以将其替换为另一个实现。

数据文件的位置配置为:

  1. # Directory of LMDB file. There are two options:
  2. # 1. A relative or absolute path to a directory that ends with 'ddata'
  3. # the full name of the directory will contain name of the ActorSystem
  4. # and its remote port.
  5. # 2. Otherwise the path is used as is, as a relative or absolute path to
  6. # a directory.
  7. akka.cluster.distributed-data.durable.lmdb.dir = "ddata"

在生产环境中运行时,你可能希望将目录配置为特定路径(alt 2),因为默认目录包含 Actor 系统的远程端口以使名称唯一。如果使用动态分配的端口(0),则每次都会不同,并且不会加载以前存储的数据。

使数据持久化有性能成本。默认情况下,在发送UpdateSuccess回复之前,每个更新都会刷新到磁盘。为了获得更好的性能,但是如果 JVM 崩溃,则有可能丢失最后一次写入,你可以启用写后模式(write behind mode)。然后在将更改写入 LMDB 并刷新到磁盘之前的一段时间内累积更改。当对同一个键执行多次写入时,启用写后处理特别有效,因为它只是将被序列化和存储的每个键的最后一个值。如果 JVM 崩溃的风险很小,则会丢失写操作,因为数据通常会根据给定的WriteConsistency立即复制到其他节点。

  1. akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200 ms

请注意,如果由于某种原因无法存储数据,则应准备接收WriteFailure作为对持久条目Update的答复。当启用write-behind-interval时,这些错误将只被记录,而UpdateSuccess仍然是对Update的答复。

当为持久数据修剪 CRDT 垃圾时,有一个重要的警告。如果一个从未修剪过的旧数据条目被注入,并在修剪(pruning)标记被删除后与现有数据合并,则该值将不正确。标记的生存时间由配置akka.cluster.distributed-data.durable.remove-pruning-marker-after定义,以天为单位。如果具有持久数据的节点没有参与修剪(例如,它被关闭),并且在这段时间之后开始修剪,这是可能的。具有持久数据的节点的停止时间不应超过此持续时间,如果在此持续时间之后再次加入,则应首先手动(从lmdb目录中)删除其数据。

CRDT 垃圾

CRDT的一个问题是,某些数据类型会累积历史记录(垃圾)。例如,GCounter跟踪每个节点的一个计数器。如果已经从一个节点更新了GCounter,它将永远关联该节点的标识符。对于添加和删除了许多群集节点的长时间运行的系统来说,这可能成为一个问题。为了解决这个问题,Replicator执行与已从集群中删除的节点相关联的数据修剪。需要修剪的数据类型必须实现RemovedNodePruning特性。有关详细信息,请参阅Replicator的 API 文档。

样例

在「Akka Distributed Data Samples with Java」中包含一些有趣的样例和教程。

  • 低延迟投票服务
  • 高可用购物车
  • 分布式服务注册表
  • 复制缓存
  • 复制指标

局限性

你应该注意一些限制。

CRDTs不能用于所有类型的问题,并且最终的一致性不适合所有域。有时候你需要很强的一致性。

它不适用于大数据。顶级条目数不应超过 100000 条。当一个新节点添加到集群中时,所有这些条目都会被传输(gossiped)到新节点。条目被分割成块,所有现有节点在gossip中协作,但传输所有条目需要一段时间(数十秒),这意味着顶级条目不能太多。当前建议的限制为 100000。如果需要的话,我们将能够改进这一点,但是设计仍然不打算用于数十亿个条目。

所有的数据都保存在内存中,这也是它不适用于大数据的另一个原因。

当一个数据条目被更改时,如果它不支持delta-CRDT,则该条目的完整状态可以复制到其他节点。delta-CRDT的完整状态也会被复制,例如当向集群中添加新节点时,或者当由于网络分裂或类似问题而无法传播delta时。这意味着你不能有太大的数据条目,因为远程消息的大小将太大。

了解有关 CRDT 的更多信息

配置

可以使用以下属性配置DistributedData扩展:

  1. # Settings for the DistributedData extension
  2. akka.cluster.distributed-data {
  3. # Actor name of the Replicator actor, /system/ddataReplicator
  4. name = ddataReplicator
  5. # Replicas are running on members tagged with this role.
  6. # All members are used if undefined or empty.
  7. role = ""
  8. # How often the Replicator should send out gossip information
  9. gossip-interval = 2 s
  10. # How often the subscribers will be notified of changes, if any
  11. notify-subscribers-interval = 500 ms
  12. # Maximum number of entries to transfer in one gossip message when synchronizing
  13. # the replicas. Next chunk will be transferred in next round of gossip.
  14. max-delta-elements = 1000
  15. # The id of the dispatcher to use for Replicator actors. If not specified
  16. # default dispatcher is used.
  17. # If specified you need to define the settings of the actual dispatcher.
  18. use-dispatcher = ""
  19. # How often the Replicator checks for pruning of data associated with
  20. # removed cluster nodes. If this is set to 'off' the pruning feature will
  21. # be completely disabled.
  22. pruning-interval = 120 s
  23. # How long time it takes to spread the data to all other replica nodes.
  24. # This is used when initiating and completing the pruning process of data associated
  25. # with removed cluster nodes. The time measurement is stopped when any replica is
  26. # unreachable, but it's still recommended to configure this with certain margin.
  27. # It should be in the magnitude of minutes even though typical dissemination time
  28. # is shorter (grows logarithmic with number of nodes). There is no advantage of
  29. # setting this too low. Setting it to large value will delay the pruning process.
  30. max-pruning-dissemination = 300 s
  31. # The markers of that pruning has been performed for a removed node are kept for this
  32. # time and thereafter removed. If and old data entry that was never pruned is somehow
  33. # injected and merged with existing data after this time the value will not be correct.
  34. # This would be possible (although unlikely) in the case of a long network partition.
  35. # It should be in the magnitude of hours. For durable data it is configured by
  36. # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
  37. pruning-marker-time-to-live = 6 h
  38. # Serialized Write and Read messages are cached when they are sent to
  39. # several nodes. If no further activity they are removed from the cache
  40. # after this duration.
  41. serializer-cache-time-to-live = 10s
  42. # Settings for delta-CRDT
  43. delta-crdt {
  44. # enable or disable delta-CRDT replication
  45. enabled = on
  46. # Some complex deltas grow in size for each update and above this
  47. # threshold such deltas are discarded and sent as full state instead.
  48. # This is number of elements or similar size hint, not size in bytes.
  49. max-delta-size = 200
  50. }
  51. durable {
  52. # List of keys that are durable. Prefix matching is supported by using * at the
  53. # end of a key.
  54. keys = []
  55. # The markers of that pruning has been performed for a removed node are kept for this
  56. # time and thereafter removed. If and old data entry that was never pruned is
  57. # injected and merged with existing data after this time the value will not be correct.
  58. # This would be possible if replica with durable data didn't participate in the pruning
  59. # (e.g. it was shutdown) and later started after this time. A durable replica should not
  60. # be stopped for longer time than this duration and if it is joining again after this
  61. # duration its data should first be manually removed (from the lmdb directory).
  62. # It should be in the magnitude of days. Note that there is a corresponding setting
  63. # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
  64. pruning-marker-time-to-live = 10 d
  65. # Fully qualified class name of the durable store actor. It must be a subclass
  66. # of akka.actor.Actor and handle the protocol defined in
  67. # akka.cluster.ddata.DurableStore. The class must have a constructor with
  68. # com.typesafe.config.Config parameter.
  69. store-actor-class = akka.cluster.ddata.LmdbDurableStore
  70. use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
  71. pinned-store {
  72. executor = thread-pool-executor
  73. type = PinnedDispatcher
  74. }
  75. # Config for the LmdbDurableStore
  76. lmdb {
  77. # Directory of LMDB file. There are two options:
  78. # 1. A relative or absolute path to a directory that ends with 'ddata'
  79. # the full name of the directory will contain name of the ActorSystem
  80. # and its remote port.
  81. # 2. Otherwise the path is used as is, as a relative or absolute path to
  82. # a directory.
  83. #
  84. # When running in production you may want to configure this to a specific
  85. # path (alt 2), since the default directory contains the remote port of the
  86. # actor system to make the name unique. If using a dynamically assigned
  87. # port (0) it will be different each time and the previously stored data
  88. # will not be loaded.
  89. dir = "ddata"
  90. # Size in bytes of the memory mapped file.
  91. map-size = 100 MiB
  92. # Accumulate changes before storing improves performance with the
  93. # risk of losing the last writes if the JVM crashes.
  94. # The interval is by default set to 'off' to write each update immediately.
  95. # Enabling write behind by specifying a duration, e.g. 200ms, is especially
  96. # efficient when performing many writes to the same key, because it is only
  97. # the last value for each key that will be serialized and stored.
  98. # write-behind-interval = 200 ms
  99. write-behind-interval = off
  100. }
  101. }
  102. }

英文原文链接Distributed Data.