Apache Spark

AttentionThis page documents an earlier version. Go to the latest (v2.1)version.

Maven

Add the following snippet to your pom.xml for Scala 2.10:

  1. <dependency>
  2. <groupId>com.yugabyte.spark</groupId>
  3. <artifactId>spark-cassandra-connector_2.10</artifactId>
  4. <version>2.0.5-yb-2</version>
  5. </dependency>

For Scala 2.11:

  1. <dependency>
  2. <groupId>com.yugabyte.spark</groupId>
  3. <artifactId>spark-cassandra-connector_2.11</artifactId>
  4. <version>2.0.5-yb-2</version>
  5. </dependency>

Sample Application

Running the Spark word-count sample application

You can run our Spark-based sample app with:

  1. $ java -jar yb-sample-apps.jar --workload CassandraSparkWordCount --nodes 127.0.0.1:9042

It reads data from a table with sentences - by default it generates an input table ybdemo_keyspace.lines, computes the frequencies of the words and writes the result to the output table ybdemo_keyspace.wordcounts.

Examining the source code

To look at the source code, you can check:

  • the source file in our GitHub source repo here
  • untar the jar java/yb-sample-apps-sources.jar in the download bundle

Most of the logic is in the run() method of the CassandraSparkWordCount class (in the file src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java). Some of the key portions of the sample program are explained in the sections below.

Main sections of an Apache Spark program on Yugabyte

Initializing the Spark context.

The SparkConf object is configured as follows:

  1. // Setup the local spark master, with the desired parallelism.
  2. SparkConf conf = new SparkConf().setAppName("yb.wordcount")
  3. .setMaster("local[1]") // num Spark threads
  4. .set("spark.cassandra.connection.host", hostname);
  5. // Create the Java Spark context object.
  6. JavaSparkContext sc = new JavaSparkContext(conf);
  7. // Create the Cassandra connector to Spark.
  8. CassandraConnector connector = CassandraConnector.apply(conf);
  9. // Create a Cassandra session, and initialize the keyspace.
  10. Session session = connector.openSession();

Setting the input source

To set the input data for Spark, you can do one of the following.

  • Reading from a table with a column line as the input:
  1. // Read rows from table and convert them to an RDD.
  2. JavaRDD<String> rows = javaFunctions(sc).cassandraTable(keyspace, inputTable)
  3. .select("line")
  4. .map(row -> row.getString("line"));
  • Reading from a file as the input:
  1. // Read the input file and convert it to an RDD.
  2. JavaRDD<String> rows = sc.textFile(inputFile);

Performing the word count processing

The word count is performed using the following code snippet:

  1. // Perform the word count.
  2. JavaPairRDD<String, Integer> counts = rows.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
  3. .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
  4. .reduceByKey((x, y) -> x + y);

Setting the output table

The output is written to the outTable table.

  1. // Create the output table.
  2. session.execute("CREATE TABLE IF NOT EXISTS " + outTable +
  3. " (word VARCHAR PRIMARY KEY, count INT);");
  4. // Save the output to the CQL table.
  5. javaFunctions(counts).writerBuilder(keyspace, outputTable, mapTupleToRow(String.class, Integer.class))
  6. .withColumnSelector(someColumns("word", "count"))
  7. .saveToCassandra();

PySpark

Start PySpark with for Scala 2.10:

  1. $ pyspark --packages com.yugabyte.spark:spark-cassandra-connector_2.10:2.0.5-yb-2

For Scala 2.11:

  1. $ pyspark --packages com.yugabyte.spark:spark-cassandra-connector_2.11:2.0.5-yb-2

sbt

Add the following library dependency to your project configuration:

  1. libraryDependencies += "com.yugabyte.spark" %% "spark-cassandra-connector" % "2.0.5-yb-2"