8.4 Distributed Processing

Sometimes you need more power than your local machine, even with all its cores, can offer. Luckily, GNU Parallel can also leverage the power of remote machines, which really allows us to speed up our pipeline.

What’s great is that GNU Parallel does not have to be installed on the remote machine. All that’s required is that you can connect to the remote machine via SSH, which is also what GNU Parallel uses to distribute our pipeline. (Having GNU Parallel installed is helpful because it can then determine how many cores to employ on each remote machine; more on this later.)

First, we’re going to obtain a list of running AWS EC2 instances. Don’t worry if you don’t have any remote machines, you can replace any occurrence of —slf hostnames, which tells GNU Parallel which remote machines to use, with —sshlogin :. This way, you can still follow along with the examples in this section.

Once we know which remote machines to take over, we’re going to consider three flavors of distributed processing:

  • Simply running ordinary commands on remote machines.
  • Distributing local data directly among remote machines.

    • Sending files to remote machines, process them, and retrieve the results.

8.4.1 Get List of Running AWS EC2 Instances

In this section we’re creating a file named hostnames that will contain one hostname of a remote machine per line. We’re using Amazon Web Services as an example. If you’re using a different cloud computing service, or have your own servers, please make sure that you create a hostnames file yourself.

We can obtain a list of running AWS EC2 instances from the commanding using aws, the command-line interface to the AWS API (Services 2014). If you’re not using the Data Science Toolbox, install awscli using pip (PyPA 2014) as follows:

  1. $ pip install awscli

With aws, you can virtually do everything you can do with the online AWS Management Console. We use this command to obtain a list of running EC2 instances from AWS, but it can do a lot more.

We assume that you know how to launch instances, either through the online Management Console or through the aws command-line tool.

The command aws ec2 describe-instances returns a lot of information about all your EC2 instances in JSON format (see http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html). We extract the relevant fields using jq:

  1. $ aws ec2 describe-instances | jq '.Reservations[].Instances[] | '\
  2. > '{public_dns: .PublicDnsName, state: .State.Name}'
  3. {
  4. "state": "running",
  5. "public_dns": "ec2-54-88-122-140.compute-1.amazonaws.com"
  6. }
  7. {
  8. "state": "stopped",
  9. "public_dns": null
  10. }

The possible states of an EC2 instance are: pending, running, shutting-down, terminated, stopping, and stopped. Since we can only distribute our pipeline to running instances, we filter out the non-running instances:

  1. $ aws ec2 describe-instances | jq -r '.Reservations[].Instances[] | '\
  2. > 'select(.State.Name=="running") | .PublicDnsName' > hostnames
  3. $ cat hostnames
  4. ec2-54-88-122-140.compute-1.amazonaws.com
  5. ec2-54-88-89-208.compute-1.amazonaws.com

(If we would leave out -r, which stands for raw, the hostnames would have been surrounded by double quotes.) We save the output to hostnames, so that we can pass this to parallel later.

As mentioned, parallel employs ssh to connect to the EC2 instances. Add the following to ~/.ssh/config, so that ssh knows how to connect to the EC2 instances:

  1. Host *.amazonaws.com
  2. IdentityFile ~/.ssh/MyKeyFile.pem
  3. User ubuntu

Depending on your which distribution your running, your username may be different than ubuntu.

8.4.2 Running Commands on Remote Machines

The first flavor of distributed processing is to simply run ordinary commands on remote machines. Let’s first double check that parallel is working by running the command-line tools hostname List of hosts:

  1. $ parallel --nonall --slf hostnames hostname
  2. ip-172-31-23-204
  3. ip-172-31-23-205

Here, —slf is short for —sshloginfile and —nonall instructs parallel to execute the same command on every remote machine in the hostnames file without using any parameters. Remember, if you don’t have any remote machines to utilize, you can replace —slf hostnames with —sshlogin : so that the command is run on your local machine:

  1. $ parallel --nonall --sshlogin : hostname
  2. data-science-toolbox

Running the same command on every remote machine once only requires one core per machine. If we wanted to distribute the list of arguments passed in to parallel then it could potentially use more than one core. If the number of cores are not specified explicitly, parallel will try to determine this:

  1. $ seq 2 | parallel --slf hostnames echo 2>&1 | fold
  2. bash: parallel: command not found
  3. parallel: Warning: Could not figure out number of cpus on ec2-54-88-122-140.comp
  4. ute-1.amazonaws.com (). Using 1.
  5. 1
  6. 2

In this case, we have parallel installed on one of the two remote machines. We’re getting a warning message indicating that parallel is not found on one of them. As a result, parallel cannot determine the number of cores and will default to using one core. When you receive this warning message, you can do one of the following four things:

  • Don’t worry, and be happy with using one core per machine.
  • Specify the number of jobs per machine via -j.
  • Specify the number of cores to use per machine by putting, for example, 2/ if you want two cores, in front of each hostname in the hostnames file.
  • Install GNU Parallel using a package manager. For example, on Ubuntu:
  1. $ parallel --nonall --slf hostnames "sudo apt-get install -y parallel"

8.4.3 Distributing Local Data among Remote Machines

The second flavor of distributed processing is to distribute local data directly among remote machines. Imagine you have one very large data set that you want to process it using multiple remote machines. For simplicity, we’re going to sum all integers from 1 to 1000. First, let’s double check that our input is actually being distributed by printing the hostname of the remote machine and the length of the input it received using wc:

  1. $ seq 1000 | parallel -N100 --pipe --slf hosts "(hostname; wc -l) | paste -sd:"
  2. ip-172-31-23-204:100
  3. ip-172-31-23-205:100
  4. ip-172-31-23-205:100
  5. ip-172-31-23-204:100
  6. ip-172-31-23-205:100
  7. ip-172-31-23-204:100
  8. ip-172-31-23-205:100
  9. ip-172-31-23-204:100
  10. ip-172-31-23-205:100
  11. ip-172-31-23-204:100

We can verify that our 1000 numbers get distributed evenly in subsets of 100 (as specified by -N100). Now, we’re ready to sum all those numbers:

  1. seq 1000 | parallel -N100 --pipe --slf hosts "paste -sd+ | bc" | paste -sd+ | bc
  2. 500500

Here, we immediately also sum the ten sums we get back from the remote machines. Let’s double check the answer is correct:

  1. $ seq 1000 | paste -sd+ | bc
  2. 500500

Good, that works. If you have a larger command that you want to execute on the remote machines, you can also put it in a separate script and upload it script with parallel.

Let’s create a very simple command-line tool called sum:

  1. #!/usr/bin/env bash
  2. paste -sd+ | bc

Don’t forget to make it executable as discussed in Chapter 4. The following command first uploads the file sum:

  1. $ seq 1000 | parallel -N100 --basefile sum --pipe --slf hosts './sum' | ./sum
  2. 500500

Of course, summing 1000 numbers is only a toy example. It would have been much faster to do this locally. However, we hope it’s clear from this that GNU Parallel can be incredibly powerful.

8.4.4 Processing Files on Remote Machines

The third flavor of distributed processing is to send files to remote machines, process them, and retrieve the results. Imagine that we want to count for each borough of New York City, how often they receive service calls on 311. We don’t have that data on our local machine yet, so let’s first obtain it from https://data.cityofnewyork.us/ using their great API:

  1. $ seq 0 100 900 | parallel "curl -sL 'http://data.cityofnewyork.us/resource'"\
  2. > "'/erm2-nwe9.json?\$limit=100&\$offset={}' | jq -c '.[]' | gzip > {#}.json.gz"

Note that jq -c '.[]' is used to flatten the array of JSON objects so that there’s one line. We now have 10 files containing compressed JSON data. Let’s see what one line of JSON looks like:

  1. $ zcat 1.json.gz | head -n 1 | fold
  2. {"school_region":"Unspecified","park_facility_name":"Unspecified","x_coordinate_
  3. state_plane":"945974","agency_name":"Department of Health and Mental Hygiene","u
  4. nique_key":"147","facility_type":"N/A","status":"Assigned","school_address":"Uns
  5. pecified","created_date":"2006-08-29T21:25:23","community_board":"01 STATEN ISLA
  6. ND","incident_zip":"10302","school_name":"Unspecified","location":{"latitude":"4
  7. 0.62745427115626","longitude":"-74.13789056665027","needs_recoding":false},"comp
  8. laint_type":"Food Establishment","city":"STATEN ISLAND","park_borough":"STATEN I
  9. SLAND","school_state":"Unspecified","longitude":"-74.13789056665027","intersecti
  10. on_street_1":"DECKER AVENUE","y_coordinate_state_plane":"167905","due_date":"200
  11. 6-10-05T21:25:23","latitude":"40.62745427115626","school_code":"Unspecified","sc
  12. hool_city":"Unspecified","address_type":"INTERSECTION","intersection_street_2":"
  13. BARRETT AVENUE","school_number":"Unspecified","resolution_action_updated_date":"
  14. 2006-10-06T00:00:17","descriptor":"Handwashing","school_zip":"Unspecified","loca
  15. tion_type":"Restaurant/Bar/Deli/Bakery","agency":"DOHMH","borough":"STATEN ISLAN
  16. D","school_phone_number":"Unspecified"}

If we were to get the total number of service calls per borough on our local machine, we would run the following command:

  1. $ zcat *.json.gz |
  2. > ./jq -r '.borough' |
  3. > tr '[A-Z] ' '[a-z]_' |
  4. > sort | uniq -c |
  5. > awk '{print $2","$1}' |
  6. > header -a borough,count |
  7. > csvsort -rc count | csvlook
  8. |----------------+--------|
  9. | borough | count |
  10. |----------------+--------|
  11. | unspecified | 467 |
  12. | manhattan | 274 |
  13. | brooklyn | 103 |
  14. | queens | 77 |
  15. | bronx | 44 |
  16. | staten_island | 35 |
  17. |----------------+--------|

Because this is quite a long pipeline, and because we’re using it again in a moment with parallel, it’s worth to go over it:

  • Expand all compressed files using zcat.
  • For each call, extract the name of the borough using jq.
  • Convert borough names to lowercase and replace spaces with underscores (because awk splits on whitespace by default).
  • Count the occurrences of each borough using sort and uniq.

  • Reverse the count and borough and make it comma delimited using awk.

  • Add a header using header.
  • Sort by count and print table using csvsort (Groskopf 2014j).

Imagine, for a moment, that our own machine is so slow that we simply cannot perform this pipeline locally. We can use GNU Parallel to distribute the local files among the remote machines, let them do the processing, and retrieve the results:

  1. $ ls *.json.gz |
  2. > parallel -v --basefile jq \
  3. > --trc {.}.csv \
  4. > --slf hostnames \
  5. > "zcat {} | ./jq -r '.borough' | tr '[A-Z] ' '[a-z]_' | sort | uniq -c |"\
  6. > " awk '{print \$2\",\"\$1}' > {.}.csv"
  7. zcat 10.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  8. zcat 2.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  9. zcat 1.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  10. zcat 3.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  11. zcat 4.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  12. zcat 5.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  13. zcat 6.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  14. zcat 7.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  15. zcat 8.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
  16. zcat 9.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'

This long command breaks down as follows:

  • Print the list of files and pipe it into parallel.
  • Transmit the jq binary to each remote machine. Lucklily, jq has no dependencies. This file will be removed from the remote machine at the end because we specified —trc (which implies the —cleanup command-line argument).
  • The command-line argument —trc {.}.csv is short for —transfer —return {.}.csv —cleanup. (The replacement string {.} gets replaced with the input filename without the last extension.) Here, this means that the JSON file gets transfered to the remote machine, the CSV file gets returned to the local machine, and both files will be removed after each job from the remote machine.
  • Specify a list of hostnames. Remember, if you want to try this out locally, you can specify —sshlogin : instead of —self hostnames.

  • Note the escaping in the awk expression. Quoting can sometimes be tricky. Here, the dollar signs and the double quotes are escaped. In quoting ever gets too confusing, remember that you put the pipeline into a separate command-line tool just as we did with sum.

If we, at some point during this command, run ls on one of the remote machines, we could see that parallel indeed transfers (and cleans up) the binary jq, the JSON files, and CSV files:

  1. $ ssh $(head -n 1 hostnames) ls
  2. 1.json.csv
  3. 1.json.gz
  4. jq

Each CSV file looks like this:

  1. $ cat 1.json.csv
  2. bronx,3
  3. brooklyn,5
  4. manhattan,24
  5. queens,3
  6. staten_island,2
  7. unspecified,63

We can sum the counts in each CSV file using Rio and the aggregate function in R:

  1. $ cat *.csv | header -a borough,count |
  2. > Rio -e 'aggregate(count ~ borough, df, sum)' |
  3. > csvsort -rc count | csvlook
  4. |----------------+--------|
  5. | borough | count |
  6. |----------------+--------|
  7. | unspecified | 467 |
  8. | manhattan | 274 |
  9. | brooklyn | 103 |
  10. | queens | 77 |
  11. | bronx | 44 |
  12. | staten_island | 35 |
  13. |----------------+--------|

Or, if you prefer to use SQL to aggregate results, you can use csvsql as discussed in Chapter 5:

  1. $ cat *.csv | header -a borough,count |
  2. > csvsql --query 'SELECT borough, SUM(count) AS count FROM stdin '\
  3. > 'GROUP BY borough ORDER BY count DESC' | csvlook
  4. |----------------+--------|
  5. | borough | count |
  6. |----------------+--------|
  7. | unspecified | 467 |
  8. | manhattan | 274 |
  9. | brooklyn | 103 |
  10. | queens | 77 |
  11. | bronx | 44 |
  12. | staten_island | 35 |
  13. |----------------+--------|