依赖

为了使用 Akka 流类型,你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-stream-typed_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-stream-typed_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.23"

简介

Akka Streams」使对类型安全的消息处理管道建模变得容易。对于类型化的 Actors,可以在不丢失类型信息的情况下将流连接到 Actors。

此模块包含现有ActorRef源的类型化替换,以及「ActorMaterializerFactory」的工厂方法,后者采用类型化ActorSystem

从这些工厂方法和源可以与来自原始模块的原始 Akka 流构建块混合和匹配。

  • 注释:此模块已准备好用于生产,但仍标记为「可能更改」。这意味着 API 或语义可以在没有警告的情况下进行更改,但这些更改将在 Akka 2.6.0 中收集并执行,而不是在 2.5.x 补丁版本中执行。

Actor Source

发送到特定 Actor 的消息驱动的流可以使用「ActorSource.actorRef」启动。此源具体化为类型化的ActorRef,它只接受与流类型相同的消息。

  1. import akka.actor.typed.ActorRef;
  2. import akka.japi.JavaPartialFunction;
  3. import akka.stream.ActorMaterializer;
  4. import akka.stream.OverflowStrategy;
  5. import akka.stream.javadsl.Sink;
  6. import akka.stream.javadsl.Source;
  7. import akka.stream.typed.javadsl.ActorSource;
  8. interface Protocol {}
  9. class Message implements Protocol {
  10. private final String msg;
  11. public Message(String msg) {
  12. this.msg = msg;
  13. }
  14. }
  15. class Complete implements Protocol {}
  16. class Fail implements Protocol {
  17. private final Exception ex;
  18. public Fail(Exception ex) {
  19. this.ex = ex;
  20. }
  21. }
  22. final JavaPartialFunction<Protocol, Throwable> failureMatcher =
  23. new JavaPartialFunction<Protocol, Throwable>() {
  24. public Throwable apply(Protocol p, boolean isCheck) {
  25. if (p instanceof Fail) {
  26. return ((Fail) p).ex;
  27. } else {
  28. throw noMatch();
  29. }
  30. }
  31. };
  32. final Source<Protocol, ActorRef<Protocol>> source =
  33. ActorSource.actorRef(
  34. (m) -> m instanceof Complete, failureMatcher, 8, OverflowStrategy.fail());
  35. final ActorRef<Protocol> ref =
  36. source
  37. .collect(
  38. new JavaPartialFunction<Protocol, String>() {
  39. public String apply(Protocol p, boolean isCheck) {
  40. if (p instanceof Message) {
  41. return ((Message) p).msg;
  42. } else {
  43. throw noMatch();
  44. }
  45. }
  46. })
  47. .to(Sink.foreach(System.out::println))
  48. .run(mat);
  49. ref.tell(new Message("msg1"));
  50. // ref.tell("msg2"); Does not compile

Actor Sink

有两个sink可接受类型化的ActorRef。要将流中的所有消息发送给 Actor 而不考虑反压力(backpressure),请使用「ActorSink.actorRef」。

  1. import akka.NotUsed;
  2. import akka.actor.typed.ActorRef;
  3. import akka.stream.ActorMaterializer;
  4. import akka.stream.javadsl.Sink;
  5. import akka.stream.javadsl.Source;
  6. import akka.stream.typed.javadsl.ActorSink;
  7. interface Protocol {}
  8. class Message implements Protocol {
  9. private final String msg;
  10. public Message(String msg) {
  11. this.msg = msg;
  12. }
  13. }
  14. class Complete implements Protocol {}
  15. class Fail implements Protocol {
  16. private final Throwable ex;
  17. public Fail(Throwable ex) {
  18. this.ex = ex;
  19. }
  20. }
  21. final ActorRef<Protocol> actor = null;
  22. final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(actor, new Complete(), Fail::new);
  23. Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);

为了使 Actor 能够对反压力作出反应,需要在 Actor 和流之间引入一个协议。使用「ActorSink.actorRefWithAck」可以在 Actor 准备接收更多元素时发出需求信号。

  1. import akka.NotUsed;
  2. import akka.actor.typed.ActorRef;
  3. import akka.stream.ActorMaterializer;
  4. import akka.stream.javadsl.Sink;
  5. import akka.stream.javadsl.Source;
  6. import akka.stream.typed.javadsl.ActorSink;
  7. class Ack {}
  8. interface Protocol {}
  9. class Init implements Protocol {
  10. private final ActorRef<Ack> ack;
  11. public Init(ActorRef<Ack> ack) {
  12. this.ack = ack;
  13. }
  14. }
  15. class Message implements Protocol {
  16. private final ActorRef<Ack> ackTo;
  17. private final String msg;
  18. public Message(ActorRef<Ack> ackTo, String msg) {
  19. this.ackTo = ackTo;
  20. this.msg = msg;
  21. }
  22. }
  23. class Complete implements Protocol {}
  24. class Fail implements Protocol {
  25. private final Throwable ex;
  26. public Fail(Throwable ex) {
  27. this.ex = ex;
  28. }
  29. }
  30. final ActorRef<Protocol> actor = null;
  31. final Sink<String, NotUsed> sink =
  32. ActorSink.actorRefWithAck(
  33. actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new);
  34. Source.single("msg1").runWith(sink, mat);

英文原文链接Streams.