并发

与大多数现代编程语言一样,Raku 被设计为支持并发(允许多个事件同时发生)和异步编程(有时称为事件驱动或反应式编程 - 即程序某些部分的事件或变化可能会导致程序流异步地改变程序的其它部分)。

Perl 的并发设计的目的是提供一个高层级的,可组合的,一致的接口,而不管如下所述的虚拟机通过工具层怎样为特定操作的系统来实现它。

此外,某些 Perl 的特性可以隐式地以异步的方式操作,所以为了确保这些特性可预测的互通,用户代码应在可能情况下,避免较低层级的并发的 API(即线程调度器),并使用高层级接口。

High-level APIs

Promises

Promise(在其他编程环境中也被称为 future)封装了在获得 promise 时可能尚未完成或甚至未开始的计算结果。PromisePlanned 状态开始, 结果要么是 Kept 状态, 这意味着该 promise 已成功完成, 要么是 Broken 状态, 意味着该 promise 已失败。 通常这就是用户代码需要以并行或异步方式操作的使用最多的功能。

  1. my $p1 = Promise.new;
  2. say $p1.status; # OUTPUT: «Planned
  3. »
  4. $p1.keep('Result');
  5. say $p1.status; # OUTPUT: «Kept
  6. »
  7. say $p1.result; # OUTPUT: «Result
  8. »
  9. # (since it has been kept, a result is available!)
  10. my $p2 = Promise.new;
  11. $p2.break('oh no');
  12. say $p2.status; # Broken
  13. say $p2.result; # dies, because the promise has been broken
  14. CATCH { default { say .^name, ': ', .Str } };
  15. # OUTPUT: «X::AdHoc+{X::Promise::Broken}: oh no
  16. »

Promise 通过组合, 例如通过链接(chaining), 通常通过 then 方法获取更多力量:

  1. my $promise1 = Promise.new();
  2. my $promise2 = $promise1.then(
  3. -> $v { say $v.result; "Second Result"}
  4. );
  5. $promise1.keep("First Result");
  6. say $promise2.result; # First Result \n Second Result

在这里 then 方法安排代码(即圆括号中的闭包)在第一个 Promise 为 kept 或 broken 时执行, 它自身返回一个新的 Promise, 这个新的 Promise 会在执行代码时与结果一块保存。 (如果代码执行失败则 broken ) keep 更改 promise 的状态为 Kept, 并设置结果为位置参数。result 阻塞当前执行的线程直到那个 promise 变为 kept 或 broken, 如果它是 kept, 那么它会返回那个结果(即传递给 keep 的值, ) 否则它会根据传递给 break 的值抛出异常。后者的行为如下所示:

  1. my $promise1 = Promise.new();
  2. my $promise2 = $promise1.then(-> $v { say "Handled but : "; say $v.result});
  3. $promise1.break("First Result");
  4. try $promise2.result;
  5. say $promise2.cause; # Handled but : \n First Result

当它在原来的作为参数传递的 promise 上调用 result 方法时, 这里的 break 会导致 then 代码块抛出异常, 这随后会导致第二个 promise 变为 broken, 在它的结果被接收时反过来引发一个异常。然后能从 cause 中访问那个实际的 Exception 对象。如果那个 promise 还没有变为 broken, 那么 cause 会引发 X::Promise::CauseOnlyValidOnBroken 异常。

Promise 也可以安排在未来自动保存(kept):

  1. my $promise1 = Promise.in(5);
  2. my $promise2 = $promise1.then(-> $v { say $v.status; 'Second Result' });
  3. say $promise2.result; # 5 秒后打印出: Kept\n Second Result

in 方法创建了一个新的 promise 并安排一个新的任务在不早于所提供的秒数内在它身上调用 keep, 返回一个新的 Promise 对象。

promises 的一个非常频繁的用法是运行一段代码, 并且一旦它成功地返回就 keep 那个 promise, 或者当那块代码死掉时中断(break)那个 promise。start 方法为此提供了一种快捷方式:

  1. my $promise = Promise.start(
  2. { my $i = 0; for 1 .. 10 { $i += $_ }; $i}
  3. );
  4. say $promise.status; # Kept
  5. say $promise.result; # 55

这里返回的 promise 的结果(result)是从代码返回的值。类似地, 如果那段代码失败了(那个 promise 也因此被中断), 那么 cause 会成为抛出的那个 Exception 对象:

  1. my $promise = Promise.start({ die "Broken Promise" });
  2. try $promise.result; # Nil
  3. say $promise.cause; # Broken Promise
  4. # in block <unit> at <unknown file> line 1

这个模式太常见了以至于它还提供了子例程形式:

  1. my $promise = start {
  2. my $i = 0;
  3. for 1 .. 10 {
  4. $i += $_
  5. }
  6. $i
  7. }
  8. my $result = await $promise;
  9. say $result;

await 几乎等价于在由 start 返回的 promise 对象身上调用 result 但是它也会接受一组 promises 并返回每个 promise 的结果:

  1. my $p1 = start {
  2. my $i = 0;
  3. for 1 .. 10 {
  4. $i += $_
  5. }
  6. $i
  7. };
  8. my $p2 = start {
  9. my $i = 0;
  10. for 1 .. 10 {
  11. $i -= $_
  12. }
  13. $i
  14. };
  15. my @result = await $p1, $p2;
  16. say @result; # [55 -55]

除了 await 之外, 两个类方法把几个 Promise 对象合并到一个新的 promise 对象中: 当所有原来的 promises 是 kept 或 broken 时, allof 返回一个 kept 状态的 promise:

  1. my $promise = Promise.allof(
  2. Promise.in(2),
  3. Promise.in(3)
  4. );
  5. await $promise;
  6. say "All done"; # Should be not much more than three seconds later

并且当原 promises 中的任何一个的状态变为 kept 或 broken 时, anyof 返回将为 kept 的新 promise:

  1. my $promise = Promise.anyof(
  2. Promise.in(3),
  3. Promise.in(8600)
  4. );
  5. await $promise;
  6. say "All done"; # Should be about 3 seconds later

不同于 await,然而如果不引用原来的 promise, 那么就访问不了原来状态为 kept 的 promise 的结果,因此当任务的完成或其他方面对于消费者来说比实际结果更重要时,或者当通过其它方式收集结果时。 你可能,例如,您可以创建一个依赖的Promise,它会检查每个原始的 promise:

  1. my @promises;
  2. for 1..5 -> $t {
  3. push @promises, start {
  4. sleep $t;
  5. Bool.pick;
  6. };
  7. }
  8. say await Promise.allof(@promises).then({ so all(@promises>>.result) });

如果所有的 promise 都保持为 True, 那么它会打印 True, 否则会打印 False。

如果你正在创建一个 promise,你打算保持或中断自己,那么在你做之前, 你可能不想要任何可能会收到 promise 以无意(或否则)保持或中断该 promise 的代码。 为了这个目的,就有了方法 vow,它返回一个 Vow 对象,它成为 promise 能被保留或中断的唯一机制。 如果试图直接保持或断开这个 Promise ,则会抛出 X::Promise::Vowed 异常,只要 vow 对象保持私有,那么 promise 的状态就是安全的:

  1. sub get_promise {
  2. my $promise = Promise.new;
  3. my $vow = $promise.vow;
  4. Promise.in(10).then({$vow.keep});
  5. $promise;
  6. }
  7. my $promise = get_promise();
  8. # Will throw an exception
  9. # "Access denied to keep/break this Promise; already vowed"
  10. $promise.keep;

返回一个将被自动保存或断开的 promise 的方法,如 instart 将会做到这一点,所以没有必要这样做。

Supplies

Supply 是异步数据流传输机制,其可以以类似于其他编程语言中的”事件”的方式同时由一个或多个消费者消费,并且可以被视为开启”事件驱动”或反应式设计。

最简单的是,Supply 是一个消息流,可以有多个通过方法 tap 创建的订阅者,其数据项可以使用 emit 放置。

Supply 可以是现场的(live)或按需的(on-demand)。 现场(live)供应就像电视广播:那些调入(收听/收看)的人不会得到先前发出的值。 点播(on-demand)广播就像 Netflix:每个开始流式传输电影(点击电源)的人,总是从头开始(获取所有的值),不管有多少人正在观看它。 请注意,没有为`按需`供应保留历史记录,而是为供应的每次点击运行 supply 块。

Netflix: 在线观看电影的网站

实时供应(live Supply)由 Supplier 工厂创建,每个发出的值在添加时传递给所有活动的 tappers:

  1. my $supplier = Supplier.new;
  2. my $supply = $supplier.Supply;
  3. $supply.tap( -> $v { say $v });
  4. for 1 .. 10 {
  5. $supplier.emit($_); # 1\n2\n3\n4\n5\n6\n7\n8\n9\n10
  6. }

请注意,tap 在供应商Supplier创建的 Supply 对象上调用,并且新值在供应商Supplier上发出。

或者作为由 supply 关键字创建的按需供应 Supply

  1. my $supply = supply {
  2. for 1 .. 10 {
  3. emit($_);
  4. }
  5. }
  6. $supply.tap( -> $v { say $v });
  7. # 1\n2\n3\n4\n5\n6\n7\n8\n9\n10

在这种情况下,供应块中的代码在每次供应返回的供应被窃取时执行,如下所示:

  1. my $supply = supply {
  2. for 1 .. 10 {
  3. emit($_);
  4. }
  5. }
  6. $supply.tap( -> $v { say "First : $v" });
  7. $supply.tap( -> $v { say "Second : $v" });

tap 方法返回一个 Tap 对象,它可以用来获取关于 tap 的信息,并且当我们不再对事件感兴趣时关闭它:

  1. my $supplier = Supplier.new;
  2. my $supply = $supplier.Supply;
  3. my $tap = $supply.tap( -> $v { say $v });
  4. $supplier.emit("OK");
  5. $tap.close;
  6. $supplier.emit("Won't trigger the tap");

在供应对象(supply object)上调用 done 调用可以为任何 tap 指定的 done 回调,但不会阻止任何其他事件被发送到流,或者接收它们。

方法 interval 返回一个新的按需供应,它会以指定的间隔定期发出一个新事件。 发出的数据是从0开始的整数,对于每个事件递增。 以下代码输出 0 .. 5:

  1. my $supply = Supply.interval(2);
  2. $supply.tap(-> $v { say $v });
  3. sleep 10;

这也可以使用 react 关键字书写(输出0..4):

  1. react {
  2. whenever Supply.interval(2) -> $v {
  3. say $v;
  4. done() if $v == 4;
  5. }
  6. }

这里,whenever 关键字使用 .act 从提供的块在 Supply 上创建一个 tap。 当在其中一个 tap 中调用 done() 时,退出 react 块。

第二个参数可以提供给 interval,它指定第一个事件触发之前的延迟(以秒为单位)。 通过 interval 创建的 supply 的每个 tap 都有自0开始的自身序列,如下所示:

  1. my $supply = Supply.interval(2);
  2. $supply.tap(-> $v { say "First $v" });
  3. sleep 6;
  4. $supply.tap(-> $v { say "Second $v"});
  5. sleep 10;

也可以从将要依次发出的值的列表中按需创建 Supply(供给),因此第一个按需示例(打印1到10)可以写作:

  1. react {
  2. whenever Supply.from-list(1..10) -> $v {
  3. say $v;
  4. }
  5. }

可以使用方法 grepmap 分别过滤或转换现有的供应对象(supply object),以类似具名列表方法的方式创建新供应(supply):grep 返回这样一个供应(supply),以至于只有在源流上发出的那些事件的 grep 条件为真时才在第二个 supply 上发出:

  1. my $supplier = Supplier.new;
  2. my $supply = $supplier.Supply;
  3. $supply.tap(-> $v { say "Original : $v" });
  4. my $odd_supply = $supply.grep({ $_ % 2 });
  5. $odd_supply.tap(-> $v { say "Odd : $v" });
  6. my $even_supply = $supply.grep({ not $_ % 2 });
  7. $even_supply.tap(-> $v { say "Even : $v" });
  8. for 0 .. 10 {
  9. $supplier.emit($_);
  10. }

map 返回一个新的 supply(供应),使得对于发送到原始供应的每个项目,发出作为传递给 map 表达式的结果的新项目:

  1. my $supplier = Supplier.new;
  2. my $supply = $supplier.Supply;
  3. $supply.tap(-> $v { say "Original : $v" });
  4. my $half_supply = $supply.map({ $_ / 2 });
  5. $half_supply.tap(-> $v { say "Half : $v" });
  6. for 0 .. 10 {
  7. $supplier.emit($_);
  8. }

如果您需要在 supply(供应)完成时运行一个操作,您可以通过在对 tap 的调用中设置 donequit 选项来完成:

  1. $supply.tap: { ... },
  2. done => { say 'Job is done.' },
  3. quit => {
  4. when X::MyApp::Error { say "App Error: ", $_.message }
  5. };

quit 块的工作方式非常类似于 CATCH。 如果异常被标记为由 whendefault 块看到,那么异常会被捕获并处理。 否则,异常继续沿调用树向上(即,与没有设置 quit 时行为相同)。

如果你伴随着 whenever 使用 react 或者 supply block 语法,你可以在你的 whenever 块中添加 phasers 来处理来自 tapped supply 的 donequit 消息:

  1. react {
  2. whenever $supply {
  3. ...; # your usual supply tap code here
  4. LAST { say 'Job is done.' }
  5. QUIT { when X::MyApp::Error { say "App Error: ", $_.message } }
  6. }
  7. }

这里的行为与在 tap 上设置 donequit 相同。

Channels

通道(Channel)是线程安全的队列,可以具有多个读取器和写入器,可以被认为在操作上与“fifo”(先进先出)或命名管道相似,除了它不启用进程间通信之外。 应该注意的是,作为真正的队列,发送到通道的每个值将仅在先读,先服务的基础上对于单个读取器可用:如果想要多个读取器能够接收可能想要发送的每个项目那么请考虑Supply

项目(item)通过方法 send 排队到通道上,方法 receive 从队列中删除一个项目并返回,如果队列为空,则阻塞它直到发送新项目:

  1. my $channel = Channel.new;
  2. $channel.send('Channel One');
  3. say $channel.receive; # 'Channel One'

如果使用 close 方法关闭了通道,那么任何发送(send)都将导致抛出异常 X::Channel::SendOnClosed,并且如果队列中没有更多的项目,接收(receive) 将抛出一个 X::Channel::ReceiveOnClosed 异常。

方法list返回通道上的所有项目,并将阻塞,直到其他项目被排队,除非通道关闭:

  1. my $channel = Channel.new;
  2. await (^10).map: -> $r {
  3. start {
  4. sleep $r;
  5. $channel.send($r);
  6. }
  7. }
  8. $channel.close;
  9. for $channel.list -> $r {
  10. say $r;
  11. }

还有从通道返回可用项目的非阻塞方法poll, 或者, 如果没有项目或通道被关闭则返回 Nil,这当然意味着必须检查通道以确定其是否关闭:

  1. my $c = Channel.new;
  2. # Start three Promises that sleep for 1..3 seconds, and then
  3. # send a value to our Channel
  4. ^3 .map: -> $v {
  5. start {
  6. sleep 3 - $v;
  7. $c.send: "$v from thread {$*THREAD.id}";
  8. }
  9. }
  10. # Wait 3 seconds before closing the channel
  11. Promise.in(3).then: { $c.close }
  12. # Continuously loop and poll the channel, until it's closed
  13. my $is-closed = $c.closed;
  14. loop {
  15. if $c.poll -> $item {
  16. say "$item received after {now - INIT now} seconds";
  17. }
  18. elsif $is-closed {
  19. last;
  20. }
  21. say 'Doing some unrelated things...';
  22. sleep .6;
  23. }
  24. # Doing some unrelated things...
  25. # Doing some unrelated things...
  26. # 2 from thread 5 received after 1.2063182 seconds
  27. # Doing some unrelated things...
  28. # Doing some unrelated things...
  29. # 1 from thread 4 received after 2.41117376 seconds
  30. # Doing some unrelated things...
  31. # 0 from thread 3 received after 3.01364461 seconds
  32. # Doing some unrelated things...

方法 closed 返回一个 Promise,当通道关闭时,它将被保存(kept)(因此在布尔上下文中将被计算为 True)。

.poll 方法可以与 .receive 方法结合使用,作为一种缓存机制,其中 .poll 返回的值不足是需要获取更多值并加载到通道的信号:

  1. sub get-value {
  2. return $c.poll // do { start replenish-cache; $c.receive };
  3. }
  4. sub replenish-cache {
  5. for ^20 {
  6. $c.send: $_ for slowly-fetch-a-thing();
  7. }
  8. }

可以使用通道代替前面描述的 wheneverreact 块中的 Supply

  1. my $channel = Channel.new;
  2. my $p = start {
  3. react {
  4. whenever $channel {
  5. say $_;
  6. }
  7. }
  8. }
  9. await (^10).map: -> $r {
  10. start {
  11. sleep $r;
  12. $channel.send($r);
  13. }
  14. }
  15. $channel.close;
  16. await $p;

还可以使用通道方法Supply获得通道,该通道方法返回通过 Supply 上的 tap 馈送的通道:

  1. my $supplier = Supplier.new;
  2. my $supply = $supplier.Supply;
  3. my $channel = $supply.Channel;
  4. my $p = start {
  5. react {
  6. whenever $channel -> $item {
  7. say "via Channel: $item";
  8. }
  9. }
  10. }
  11. await (^10).map: -> $r {
  12. start {
  13. sleep $r;
  14. $supplier.emit($r);
  15. }
  16. }
  17. $supplier.done;
  18. await $p;

Channel 将返回一个不同的通道,每次调用时都会使用相同的数据。 这可以用于例如将 Supply 输出到一个或多个通道以在程序中提供的不同接口。

Proc::Async

Proc::Async 构建在所描述的设施上以异步方式运行并与外部程序交互:

  1. my $proc = Proc::Async.new('echo', 'foo', 'bar');
  2. $proc.stdout.tap(-> $v { print "Output: $v" });
  3. $proc.stderr.tap(-> $v { print "Error: $v" });
  4. say "Starting...";
  5. my $promise = $proc.start;
  6. await $promise;
  7. say "Done.";
  8. # Output:
  9. # Starting...
  10. # Output: foo bar
  11. # Done.

命令的路径以及命令的任何参数都提供给该构造函数。 该命令将不被执行,直到调用 start,它将返回一个 Promise,当程序退出时该 Promise 变为 kept 状态。 程序的标准输出和标准错误分别从 stdoutstderr 方法中作为 Supply 对象提供,可以根据需要进行分接。

如果要写入程序的标准输入,您可以给构造函数提供 :w 副词,并使用方法 writeprintsay 在程序启动后写入打开的管道:

  1. my $proc = Proc::Async.new(:w, 'grep', 'foo');
  2. $proc.stdout.tap(-> $v { print "Output: $v" });
  3. say "Starting...";
  4. my $promise = $proc.start;
  5. $proc.say("this line has foo");
  6. $proc.say("this one doesn't");
  7. $proc.close-stdin;
  8. await $promise;
  9. say "Done.";
  10. # Output:
  11. # Starting...
  12. # Output: this line has foo
  13. # Done.

一些程序(例如本例中没有文件参数的 grep)在关闭标准输入之前不会退出,因此在完成写入后可以调用 close-stdin,以允许由 start 返回的 Promise 的状态变为 kept。

Low-level APIs

Threads

最低级别的并发接口由 Thread 提供。 线程可以被认为是可以最终在处理器上运行的一段代码,其布置几乎完全由虚拟机和/或操作系统完成。 线程应该被考虑,对于所有意图,很大程度上是不受管理的,应避免在用户代码中直接使用它们。

线程可以被创建,然后随后实际运行:

  1. my $thread = Thread.new(code => { for 1 .. 10 -> $v { say $v }});
  2. # ...
  3. $thread.run;

或者可以在单个调用中创建和运行:

  1. my $thread = Thread.start({ for 1 .. 10 -> $v { say $v }});

在这两种情况下,由 Thread 对象封装的代码的完成可以用 finish 方法来等待,该方法将阻塞直到线程完成:

  1. $thread.finish;

除此之外,没有用于同步或资源共享的其他设施,这主要是为什么应当强调线程不可能直接用于用户代码。

Schedulers

并发 API 的下一级由实现角色Scheduler定义的接口的类提供。 调度程序接口的目的是提供一种机制来确定使用哪些资源来运行特定任务以及何时运行它。 大多数较高级别的并发 API 是基于调度器构建的,并且用户代码根本不需要使用它们,尽管一些方法,例如在 Proc::AsyncPromiseSupply 中找到的方法允许您明确地提供调度器。

当前缺省全局调度程序在变量 $*SCHEDULER 中可用。

调度程序的主接口(确实是Scheduler接口所需的唯一方法)是 cue 方法:

  1. method cue(:&code, Instant :$at, :$in, :$every, :$times = 1; :&catch)

这将按照由副词(如在Scheduler中记录的)所确定的方式使用由调度器实现的执行方案来调度 &code 中的 Callable 以执行。 例如:

  1. my $i = 0;
  2. my $cancellation = $*SCHEDULER.cue({ say $i++}, every => 2 );
  3. sleep 20;

假设 $*SCHEDULER 没有从默认值改变,将以大约每两秒打印数字0到10(即使用操作系统调度容差)。 在这种情况下,代码将被调度运行,直到程序正常结束,但是该方法返回一个 Cancellation 对象,它可以用来在正常完成之前取消调度执行:

  1. my $i = 0;
  2. my $cancellation = $*SCHEDULER.cue({ say $i++}, every => 2 );
  3. sleep 10;
  4. $cancellation.cancel;
  5. sleep 10;

应该只输出 0 到 5,

尽管 Scheduler 接口提供的所有功能明显优于 Thread 提供的,但是通过更高级别的接口可以获得所有的功能,并且不应该有必要直接使用调度器,除非在上述情况下,调度器可以被明确地提供给某些方法。

如果库具有特殊要求,例如 UI 库可能希望所有代码在单个 UI 线程中运行,或者可能需要一些定制的优先级机制,则库可能希望提供备选的调度器实现,然而,被作为标准的实现和下面的描述应该足以满足大多数用户代码。

ThreadPoolScheduler

ThreadPoolScheduler 是默认调度程序,它维护一个根据需要分配的线程池,根据需要创建新的线程,直到创建调度程序对象时作为参数给出的最大数目(默认值为16)。如果超过最大值 那么 cue 可以对代码进行排队,直到线程变得可用为止。

Rakudo 允许在程序启动时由环境变量 RAKUDO_MAX_THREADS 在默认调度程序中设置允许的最大线程数。

CurrentThreadScheduler

CurrentThreadScheduler 是一个非常简单的调度程序,它将始终调度代码在当前线程上立即运行。 暗示这个调度器的 cue 将阻塞,直到代码完成执行,把它的效用限制在某些特殊情况,如测试。

Locks

Lock 在并发环境中提供了保护共享数据的低级机制,并因此是高级 API 中支持线程安全性的关键,这在其他编程语言中有时称为 “Mutex”。 因为较高级别的类(PromiseSupplyChannel)在需要时使用 Lock,所以用户代码不可能直接使用 Lock。

Lock 的主接口是方法 protect,它确保一个代码块(通常称为“临界区”)只能在一个线程中同时执行:

  1. my $lock = Lock.new;
  2. my $a = 0;
  3. await (^10).map: {
  4. start {
  5. $lock.protect({
  6. my $r = rand;
  7. sleep $r;
  8. $a++;
  9. });
  10. }
  11. }
  12. say $a; # 10

protect 返回代码块返回任何东西。

因为 protect 将阻止任何等着要执行临界区的线程,所以代码应该尽可能快。

Safety Concerns

一些共享数据并发问题相比其他问题并不明显。 关于这个问题的好文章请看这个博客

要注意的一个特别的问题是当容器自动更新或发生扩展时。 当数组哈希条目被赋初始值时,底层结构被更改,并且那个操作不是异步安全的。 例如,在这段代码中:

  1. my @array;
  2. my $slot := @array[20];
  3. $slot = 'foo';

第三行是临界区,因为那就是数组被扩展之时。 最简单的解决方法是使用 <Lock> 来保护临界区。 一个可能更好的解决方案是重构代码,以使共享容器不是必需的。