SE6023 Lab4 Spark & Scala

tags: hadoop

Introducing Spark

What is Apache Spark?

Why Spark?

In 2014 Gray Sort competition, a 3rd-party benchmark measuring how fast a system can sort 100 TB of data, shows that Spark has performance advantages than plain MapReduce:

Source: Databricks blog, Sort benchmark, Spark SQL on SIGMOD 2015

Spark Stack

Spark Interactive Shell

We will first introduce the API through Spark’s interactive shell:

then show you how to write RDD applications in Java, Scala, and Python.

Source: https://spark.apache.org/docs/latest/quick-start.html

Spark APIs

Spark now provide 3 APIs to make different type of workload easier and more optimized.

Source: A Tale of Three Apache Spark APIs


Core Spark Concept: Resilient Distributed Dataset (RDD)

RDD is a read-only collection of objects that is partitioned across multiple machines in a cluster.
In a typical Spark program, one or more RDDs are loaded as input and through a series of transformations are turned into a set of target RDDs, which have an action performed on them (such as computing a result or writing them to persistent storage).
The term “resilient” in “Resilient Distributed Dataset” refers to the fact that Spark can automatically reconstruct a lost partition by recomputing it from the RDDs that it was computed from.

Hadoop: The Definitive Guide, 4th Edition, Page 550

RDD Operations


RDDs support two types of operations:

Source

RDD of Strings

RDD Fault Tolerance in Spark
If the original RDD that was used to create your current partition also isn’t there (neither in memory or on disk) then Spark will have to go one step back again and recompute the previous RDD. In the worst case scenario Spark will have to go all the way back to the original data.

More Information:

Different between Hadoop MapReduce and RDD


Iterative Operations

Note
If the Distributed memory (RAM) is not sufficient to store intermediate results (State of the JOB), then it will store those results on the disk.

Interactive Operations

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory
(for much faster access when next time you query it)

Source: https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm

Creating RDDs

There are two ways to create RDDs

  1. Parallelizing an existing collection in your driver program
  2. Referencing a dataset in an external storage system (shared file system, HDFS, HBase …)

What is the “driver program”?


"The machine where the Spark application process (the one that creates the SparkContext) is running is the “Driver” node, with process being called the Driver process. "

Source: What does the master node and driver program mean in Spark?

1. Parallelizing Collections

Scala

val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) distData.reduce((a, b) => a + b)

Python

data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)

2. Referencing a dataset

Spark supports

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI)

HTTP/HTTPS URLs is not supported.

Scala

val distFile = sc.textFile("data.txt") distFile.map(s => s.length).reduce((a, b) => a + b)

Python

distFile = sc.textFile("data.txt") # Saving and Loading SequenceFiles rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) rdd.saveAsSequenceFile("path/to/file") sorted(sc.sequenceFile("path/to/file").collect()) >>> [(1, u'a'), (2, u'aa'), (3, u'aaa')]

If you are using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

RDD Operations

Basics

Scala

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

Python

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

If we also wanted to use lineLengths again later, we could add:

lineLengths.persist()

before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

Working with Key-Value Pairs

Scala

val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) // <-tuple val counts = pairs.reduceByKey((a, b) => a + b)

Python

lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)

We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as an array of objects.

Transformations

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Transformation Meaning
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
Transformation Meaning
reduceByKey(func, [numPartitions])

Map

Example:

val newData = data.map (line => line.toUpperCase() )

or

val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

FlatMap


Example:

val result = data.flatMap(line => line.split(" ") )

Above flatMap transformation will convert a line into words. One word will be an individual element of the newly created RDD.

reduceByKey

MapReduce in Java

public class LineLengthReducer
    extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
  @Override
  protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable count : counts) {
      sum += count.get();
    }
    context.write(length, new IntWritable(sum));
  }
}

Spark Scala

val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(a,b=>a+b)

More Information: Shuffle operations in Spark(Link)

Sources

  1. Apache Spark Map vs FlatMap Operation
  2. Scala - can a lambda parameter match a tuple?
  3. reduceByKey: How does it work internally?
  4. How-to: Translate from MapReduce to Apache Spark
  5. How to use functions as variables (values) in Scala

Actions

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program.
count() Return the number of elements in the dataset.
foreach(func)
saveAsTextFile(path)
take(n) Return an array with the first n elements of the dataset.

Reference

Examples

Source: https://spark.apache.org/examples.html

Using the Spark Shell

Scala: spark-shell
Python: pyspark

WordCount

Scala

val inputFilePath = "hdfs://hadoop-master:8020/user/a000000000/spark-wordcount-input.txt" val textFile = sc.textFile(inputFilePath) val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) counts.collect() counts.saveAsTextFile("hdfs://...")

Python

inputFilePath = "hdfs://hadoop-master:8020/user/a000000000/spark-wordcount-input.txt" text_file = sc.textFile(inputFilePath) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.collect() counts.saveAsTextFile("hdfs://...")

Pi Estimation

Therom

When amount of points is large enough,π41=Points in a quarter of a circlePoints in a square

Source: Estimating Pi with Monte Carlo simulated dart throwing!

Scala

import math.random
import math.pow

val NUM_SAMPLES=math.pow(10,8).asInstanceOf[Int]

val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")

Python

import random

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

NUM_SAMPLES=10**8
count = sc.parallelize(xrange(0, NUM_SAMPLES)).filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Submitting Applications

Source: https://spark.apache.org/docs/latest/submitting-applications.html#master-urls

Linking Spark & Initializing Spark

Python (Recommended)

from pyspark import SparkContext, SparkConf appName = "caculate-pi" master = "yarn" conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)

Scala

import org.apache.spark.SparkContext import org.apache.spark.SparkConf val appName = "caculate-pi" val master = "yarn" val conf = new SparkConf().setAppName(appName).setMaster(master) val sc = new SparkContext(conf)

Java

import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; String appName = "caculate-pi"; String master = "yarn"; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode.

Launching Applications with spark-submit

Python application on a yarn cluster

~$ wget https://pastebin.com/raw/QB1DSCgn --output-document=caculate-pi.py
~$ spark-submit caculate-pi.py

The content of caculate-pi.py

from pyspark import SparkContext, SparkConf import random appName = "caculate-pi" master = "yarn" conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES=10**6 count = sc.parallelize(xrange(0, NUM_SAMPLES)).filter(inside).count() print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Or you can submit using java JAR file:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
:::danger 以下為spark 1.6所用的方法,spark2版本後已經不產生spark-assembly-*.jar,且多用sbt build,故此方法失效 ::: **Build Spark Application** ```shell # Create your project directory(done at the local file system) ~$ mkdir ~/lab4/wc -p # Download WordCount.scala ~$ wget ~/lab4/wc/WordCount.scala ```

Or you can create it manually. The content of WordCount.scala

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD

object RunWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("wordCount"))
    val textFile = sc.textFile(args(0)) 
    val countsRDD = textFile.flatMap(line => line.split(" ")) 
      .map(word => (word, 1))
      .reduceByKey(_ + _) 
    try {
      countsRDD.saveAsTextFile(args(1))
    } catch {
      case e: Exception => println(“Error!);
    }
    sc.stop()
  }
}

Then,

# Compile the `WordCount.scala`
~/lab4/wc$ scalac -classpath /usr/local/spark/lib/spark-assembly-1.6.1-hadoop2.4.0.jar WordCount.scala

# Create a jar file
~/lab4$ jar -cvf wc.jar wc/ .

# Create input directory for WordCount
~$ hadoop fs -mkdir -p /user/[username]/input

# Put data to your input directory on HDFS
~$ hadoop fs -put ~/lab4/wc/WordCount.scala  /user/[USERNAME]/input

# Submit job to hadoop
~$ spark-submit --class RunWordCount --master yarn-cluster wc.jar input/WordCount.scala output

More

https://spark.apache.org/docs/latest/rdd-programming-guide.html

>>Hadoop Cluster Information (use UNIX password to log in)<<

Reference
https://www.oreilly.com/library/view/learning-spark/9781449359034/ch04.html
https://spark.apache.org/docs/latest/rdd-programming-guide.html

Practice 4-1, 4-2

4-1

Due date: 2019/05/20 23:59
This is a group assignment.

Write a Spark program in Python/Java/Scala and save the code in ~/lab4/ with filename practice1 (For example, pratice1.py).

Please download the input file and upload it to your HDFS home folder:

$ wget https://pastebin.com/raw/P0FXCARK -O practice4-1.txt
$ hadoop fs -put practice4-1.txt

Your program must read the file from hdfs://hadoop-master:8020/user/[your user name]/practice4-1.txt, then print each line that contains “shoe” to stdout(which is your screen, by default).

Hint
The results will look like

Cinderella obeyed, and the Fairy, touching it with her wand, turned it into a grand coach. Then she desired Cinderella to go to the trap, and bring her a rat. The girl obeyed, and a touch of the Fairy’s wand turned him into a very smart coachman. Two mice were turned into footmen; four grasshoppers into white horses. Next, the Fairy touched Cinderella’s rags, and they became rich satin robes, trimmed with point lace. Diamonds shone in her hair and on her neck and arms, and her kind godmother thought she had seldom seen so lovely a girl. Her old shoes became a charming pair of glass slippers, which shone like diamonds.
However, the Prince’s search was rewarded by his finding the glass slipper, which he well knew belonged to the unknown Princess. He loved Cinderella so much that he now resolved to marry her; and as he felt sure that no one else could wear such a tiny shoe as hers was, he sent out a herald to proclaim that whichever lady in his kingdom could put on this glass slipper should be his wife.

4-2

Due date: 2019/05/24 23:59
This is a group assignment.

Write a Spark program in Python/Java/Scala and save the code in ~/lab4/ with filename practice2 (For example, pratice2.py).

Please download the input file and upload it to your HDFS home folder:

$ wget https://pastebin.com/raw/KZJrgL6C -O practice4-2.txt
$ hadoop fs -put practice4-2.txt

Each line in the input file represents a vertex containing x and y coordinates.

Content of the input file:

3.9854848869196324,71.29551177914277
5.480589112961365,90.61946351121573
47.64530232634527,19.706018538612092
65.02517993302833,65.46846854522971
64.8364383782732,12.342955227442086
...

Your program must read the file from hdfs://hadoop-master:8020/user/[your user name]/practice4-2.txt, then print the minimal distance between two vertices to stdout.

Tip
Use cartesian() and reduce(), or min()

Expand all回到頂部移至底部