Introducing Spark

What is Apache Spark?

Why Spark?

In 2014 Gray Sort competition, a 3rd-party benchmark measuring how fast a system can sort 100 TB of data, shows that Spark has performance advantages than plain MapReduce:

Source: Databricks blog, Sort benchmark, Spark SQL on SIGMOD 2015

Spark Stack

Spark Interactive Shell

We will first introduce the API through Spark’s interactive shell:

then show you how to write RDD applications in Java, Scala, and Python.


Spark APIs

Spark now provide 3 APIs to make different type of workload easier and more optimized.

Source: A Tale of Three Apache Spark APIs

Core Spark Concept: Resilient Distributed Dataset (RDD)

RDD is a read-only collection of objects that is partitioned across multiple machines in a cluster.
In a typical Spark program, one or more RDDs are loaded as input and through a series of transformations are turned into a set of target RDDs, which have an action performed on them (such as computing a result or writing them to persistent storage).
The term “resilient” in “Resilient Distributed Dataset” refers to the fact that Spark can automatically reconstruct a lost partition by recomputing it from the RDDs that it was computed from.

Hadoop: The Definitive Guide, 4th Edition, Page 550

RDD Operations

RDDs support two types of operations:


RDD of Strings

RDD Fault Tolerance in Spark
If the original RDD that was used to create your current partition also isn’t there (neither in memory or on disk) then Spark will have to go one step back again and recompute the previous RDD. In the worst case scenario Spark will have to go all the way back to the original data.

More Information:

Different between Hadoop MapReduce and RDD

Iterative Operations

If the Distributed memory (RAM) is not sufficient to store intermediate results (State of the JOB), then it will store those results on the disk.

Interactive Operations

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory
(for much faster access when next time you query it)


Creating RDDs

There are two ways to create RDDs

  1. Parallelizing an existing collection in your driver program
  2. Referencing a dataset in an external storage system (shared file system, HDFS, HBase …)

What is the “driver program”?

"The machine where the Spark application process (the one that creates the SparkContext) is running is the “Driver” node, with process being called the Driver process. "

Source: What does the master node and driver program mean in Spark?

1. Parallelizing Collections


val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) distData.reduce((a, b) => a + b)


data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)

2. Referencing a dataset

Spark supports

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI)

HTTP/HTTPS URLs is not supported.


val distFile = sc.textFile("data.txt") => s.length).reduce((a, b) => a + b)


distFile = sc.textFile("data.txt") # Saving and Loading SequenceFiles rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) rdd.saveAsSequenceFile("path/to/file") sorted(sc.sequenceFile("path/to/file").collect()) >>> [(1, u'a'), (2, u'aa'), (3, u'aaa')]

If you are using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

RDD Operations



val lines = sc.textFile("data.txt")
val lineLengths = => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)


lines = sc.textFile("data.txt")
lineLengths = s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

If we also wanted to use lineLengths again later, we could add:


before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

Working with Key-Value Pairs


val lines = sc.textFile("data.txt") val pairs = => (s, 1)) // <-tuple val counts = pairs.reduceByKey((a, b) => a + b)


lines = sc.textFile("data.txt") pairs = s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)

We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as an array of objects.


Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Transformation Meaning
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
Transformation Meaning
reduceByKey(func, [numPartitions])



val newData = (line => line.toUpperCase() )


val pairs = => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)



val result = data.flatMap(line => line.split(" ") )

Above flatMap transformation will convert a line into words. One word will be an individual element of the newly created RDD.


MapReduce in Java

public class LineLengthReducer
    extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
  protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable count : counts) {
      sum += count.get();
    context.write(length, new IntWritable(sum));

Spark Scala

val lengthCounts = => (line.length, 1)).reduceByKey(a,b=>a+b)

More Information: Shuffle operations in Spark(Link)


  1. Apache Spark Map vs FlatMap Operation
  2. Scala - can a lambda parameter match a tuple?
  3. reduceByKey: How does it work internally?
  4. How-to: Translate from MapReduce to Apache Spark
  5. How to use functions as variables (values) in Scala


Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program.
count() Return the number of elements in the dataset.
take(n) Return an array with the first n elements of the dataset.




Using the Spark Shell

Scala: spark-shell
Python: pyspark



val inputFilePath = "hdfs://hadoop-master:8020/user/a000000000/spark-wordcount-input.txt" val textFile = sc.textFile(inputFilePath) val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) counts.collect() counts.saveAsTextFile("hdfs://...")


inputFilePath = "hdfs://hadoop-master:8020/user/a000000000/spark-wordcount-input.txt" text_file = sc.textFile(inputFilePath) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.collect() counts.saveAsTextFile("hdfs://...")

Pi Estimation


When amount of points is large enough,π41=Points in a quarter of a circlePoints in a square

Source: Estimating Pi with Monte Carlo simulated dart throwing!


import math.random
import math.pow

val NUM_SAMPLES=math.pow(10,8).asInstanceOf[Int]

val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")


import random

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(xrange(0, NUM_SAMPLES)).filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Submitting Applications


Linking Spark & Initializing Spark

Python (Recommended)

from pyspark import SparkContext, SparkConf appName = "caculate-pi" master = "yarn" conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)


import org.apache.spark.SparkContext import org.apache.spark.SparkConf val appName = "caculate-pi" val master = "yarn" val conf = new SparkConf().setAppName(appName).setMaster(master) val sc = new SparkContext(conf)


import; import; import org.apache.spark.SparkConf; String appName = "caculate-pi"; String master = "yarn"; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode.

Launching Applications with spark-submit

Python application on a yarn cluster

~$ wget
~$ spark-submit

The content of

from pyspark import SparkContext, SparkConf import random appName = "caculate-pi" master = "yarn" conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES=10**6 count = sc.parallelize(xrange(0, NUM_SAMPLES)).filter(inside).count() print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Or you can submit using java JAR file:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
:::danger 以下為spark 1.6所用的方法,spark2版本後已經不產生spark-assembly-*.jar,且多用sbt build,故此方法失效 ::: **Build Spark Application** ```shell # Create your project directory(done at the local file system) ~$ mkdir ~/lab4/wc -p # Download WordCount.scala ~$ wget ~/lab4/wc/WordCount.scala ```

Or you can create it manually. The content of WordCount.scala

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD

object RunWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("wordCount"))
    val textFile = sc.textFile(args(0)) 
    val countsRDD = textFile.flatMap(line => line.split(" ")) 
      .map(word => (word, 1))
      .reduceByKey(_ + _) 
    try {
    } catch {
      case e: Exception => println(“Error!);


# Compile the `WordCount.scala`
~/lab4/wc$ scalac -classpath /usr/local/spark/lib/spark-assembly-1.6.1-hadoop2.4.0.jar WordCount.scala

# Create a jar file
~/lab4$ jar -cvf wc.jar wc/ .

# Create input directory for WordCount
~$ hadoop fs -mkdir -p /user/[username]/input

# Put data to your input directory on HDFS
~$ hadoop fs -put ~/lab4/wc/WordCount.scala  /user/[USERNAME]/input

# Submit job to hadoop
~$ spark-submit --class RunWordCount --master yarn-cluster wc.jar input/WordCount.scala output


Practice 4-1, 4-2


Practice 4-1, 4-2


Due date: 2019/05/20 23:59
This is a group assignment.

Write a Spark program in Python/Java/Scala and save the code in ~/lab4/ with filename practice1 (For example,

Please download the input file and upload it to your HDFS home folder:

$ wget -O practice4-1.txt
$ hadoop fs -put practice4-1.txt

Your program must read the file from hdfs://hadoop-master:8020/user/[your user name]/practice4-1.txt, then print each line that contains “shoe” to stdout(which is your screen, by default).

The results will look like

Cinderella obeyed, and the Fairy, touching it with her wand, turned it into a grand coach. Then she desired Cinderella to go to the trap, and bring her a rat. The girl obeyed, and a touch of the Fairy’s wand turned him into a very smart coachman. Two mice were turned into footmen; four grasshoppers into white horses. Next, the Fairy touched Cinderella’s rags, and they became rich satin robes, trimmed with point lace. Diamonds shone in her hair and on her neck and arms, and her kind godmother thought she had seldom seen so lovely a girl. Her old shoes became a charming pair of glass slippers, which shone like diamonds.
However, the Prince’s search was rewarded by his finding the glass slipper, which he well knew belonged to the unknown Princess. He loved Cinderella so much that he now resolved to marry her; and as he felt sure that no one else could wear such a tiny shoe as hers was, he sent out a herald to proclaim that whichever lady in his kingdom could put on this glass slipper should be his wife.


Due date: 2019/05/24 23:59
This is a group assignment.

Write a Spark program in Python/Java/Scala and save the code in ~/lab4/ with filename practice2 (For example,

Please download the input file and upload it to your HDFS home folder:

$ wget -O practice4-2.txt
$ hadoop fs -put practice4-2.txt

Each line in the input file represents a vertex containing x and y coordinates.

Content of the input file:


Your program must read the file from hdfs://hadoop-master:8020/user/[your user name]/practice4-2.txt, then print the minimal distance between two vertices to stdout.

Use cartesian() and reduce(), or min()

