Testcase: map-reduce

Rust makes it very easy to parallelise data processing, without many of the headaches traditionally associated with such an attempt.

The standard library provides great threading primitives out of the box.
These, combined with Rust’s concept of Ownership and aliasing rules, automatically prevent
data races.

The aliasing rules (one writable reference XOR many readable references) automatically prevent
you from manipulating state that is visible to other threads. (Where synchronisation is needed,
there are synchronisation
primitives like Mutexes or Channels.)

In this example, we will calculate the sum of all digits in a block of numbers.
We will do this by parcelling out chunks of the block into different threads. Each thread will sum
its tiny block of digits, and subsequently we will sum the intermediate sums produced by each
thread.

Note that, although we’re passing references across thread boundaries, Rust understands that we’re
only passing read-only references, and that thus no unsafety or data races can occur. Because
we’re move-ing the data segments into the thread, Rust will also ensure the data is kept alive
until the threads exit, so no dangling pointers occur.

  1. use std::thread;
  2. // This is the `main` thread
  3. fn main() {
  4. // This is our data to process.
  5. // We will calculate the sum of all digits via a threaded map-reduce algorithm.
  6. // Each whitespace separated chunk will be handled in a different thread.
  7. //
  8. // TODO: see what happens to the output if you insert spaces!
  9. let data = "86967897737416471853297327050364959
  10. 11861322575564723963297542624962850
  11. 70856234701860851907960690014725639
  12. 38397966707106094172783238747669219
  13. 52380795257888236525459303330302837
  14. 58495327135744041048897885734297812
  15. 69920216438980873548808413720956532
  16. 16278424637452589860345374828574668";
  17. // Make a vector to hold the child-threads which we will spawn.
  18. let mut children = vec![];
  19. /*************************************************************************
  20. * "Map" phase
  21. *
  22. * Divide our data into segments, and apply initial processing
  23. ************************************************************************/
  24. // split our data into segments for individual calculation
  25. // each chunk will be a reference (&str) into the actual data
  26. let chunked_data = data.split_whitespace();
  27. // Iterate over the data segments.
  28. // .enumerate() adds the current loop index to whatever is iterated
  29. // the resulting tuple "(index, element)" is then immediately
  30. // "destructured" into two variables, "i" and "data_segment" with a
  31. // "destructuring assignment"
  32. for (i, data_segment) in chunked_data.enumerate() {
  33. println!("data segment {} is \"{}\"", i, data_segment);
  34. // Process each data segment in a separate thread
  35. //
  36. // spawn() returns a handle to the new thread,
  37. // which we MUST keep to access the returned value
  38. //
  39. // 'move || -> u32' is syntax for a closure that:
  40. // * takes no arguments ('||')
  41. // * takes ownership of its captured variables ('move') and
  42. // * returns an unsigned 32-bit integer ('-> u32')
  43. //
  44. // Rust is smart enough to infer the '-> u32' from
  45. // the closure itself so we could have left that out.
  46. //
  47. // TODO: try removing the 'move' and see what happens
  48. children.push(thread::spawn(move || -> u32 {
  49. // Calculate the intermediate sum of this segment:
  50. let result = data_segment
  51. // iterate over the characters of our segment..
  52. .chars()
  53. // .. convert text-characters to their number value..
  54. .map(|c| c.to_digit(10).expect("should be a digit"))
  55. // .. and sum the resulting iterator of numbers
  56. .sum();
  57. // println! locks stdout, so no text-interleaving occurs
  58. println!("processed segment {}, result={}", i, result);
  59. // "return" not needed, because Rust is an "expression language", the
  60. // last evaluated expression in each block is automatically its value.
  61. result
  62. }));
  63. }
  64. /*************************************************************************
  65. * "Reduce" phase
  66. *
  67. * Collect our intermediate results, and combine them into a final result
  68. ************************************************************************/
  69. // collect each thread's intermediate results into a new Vec
  70. let mut intermediate_sums = vec![];
  71. for child in children {
  72. // collect each child thread's return-value
  73. let intermediate_sum = child.join().unwrap();
  74. intermediate_sums.push(intermediate_sum);
  75. }
  76. // combine all intermediate sums into a single final sum.
  77. //
  78. // we use the "turbofish" ::<> to provide sum() with a type hint.
  79. //
  80. // TODO: try without the turbofish, by instead explicitly
  81. // specifying the type of final_result
  82. let final_result = intermediate_sums.iter().sum::<u32>();
  83. println!("Final sum result: {}", final_result);
  84. }

Assignments

It is not wise to let our number of threads depend on user inputted data.
What if the user decides to insert a lot of spaces? Do we really want to spawn 2,000 threads?
Modify the program so that the data is always chunked into a limited number of chunks,
defined by a static constant at the beginning of the program.

See also: