Running Spark locally (single machine)
You must have Java to run Spark.
$ java -version java version "1.8.0_151" Java(TM) SE Runtime Environment (build 1.8.0_151-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
Go to the Spark Download page
and get the latest version, leaving all options to default.
At the time of writing this is
$ cd ~ $ mv Downloads/spark-2.2.1-bin-hadoop2.7.tgz . $ tar -xf spark-2.2.1-bin-hadoop2.7.tgz $ cd spark-2.2.1-bin-hadoop2.7
Launch the interactive console (Scala by default):
$ ./bin/spark-shell Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/02/19 14:56:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://10.62.0.37:4040 Spark context available as 'sc' (master = local[*], app id = local-1519052217524). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151) Type in expressions to have them evaluated. Type :help for more information. scala>
The Spark Session
The Spark Application is controlled through a driver process called
SparkSession instance ensures appropriate execution across the cluster.
Even with multiple nodes in the cluster, one
SparkSession corresponds to one Spark Application.
This instance is available as the
spark variable in Python and Scala when using the console.
scala> spark res0: org.apache.spark.sql.SparkSession = [email protected] scala>
are the most common type of data structure in Spark.
They are essentially like relational tables: a collection of named columns.
Let’s create one!
We will create here a single-column
number, with numbers from
scala> val myRange = spark.range(1000).toDF("number") 18/02/19 15:23:26 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException myRange: org.apache.spark.sql.DataFrame = [number: bigint]
Let’s show this now:
scala> myRange.show() +------+ |number| +------+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| +------+ only showing top 20 rows
We can also show its schema (which in this case has been auto-guessed from supplied data):
scala> myRange.printSchema() root |-- number: long (nullable = false)
This range of numbers is called a distributed collection, and as its name suggests, when Spark runs on a cluster, executors will have a sub-set of it.
This allows tasks to run in parallel. Each chunk is called partition.
Don’t be afraid of all that as you won’t even have to care about it for the most part, Spark will do it for you.
In Spark, core data structure are immutable, so they cannot be changed after they have been created.
Transformations are lazy, which means that they are defined but not executed immediately. They are executed only when an action is requested.
Consider the following example, filtering out our list of numbers which are even (there is no remainder when dividing it by 2).
scala> val divisBy2 = myRange.where("number % 2 = 0") divisBy2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: bigint] scala>
The actual computation of it would happen only when we:
scala> divisBy2.show() +------+ |number| +------+ | 0| | 2| | 4| | 6| | 8| | 10| | 12| | 14| | 16| | 18| | 20| | 22| | 24| | 26| | 28| | 30| | 32| | 34| | 36| | 38| +------+ only showing top 20 rows
scala> divisBy2.count() res7: Long = 500
The benefit of not executing transformations immediately is:
- we can chain transformations (pipelines)
- we’re effectively building a plan, which allows Spark to start organising itself for optimal execution
There are 2 types of transformations:
- narrow transformations: One input partition gives one and only one output partition (this is the case for our
- wide transformation: The data required to compute the records in a single partition may reside in many partitions of the parent.
Open a new terminal and download a CSV example file to
$ wget -P /tmp https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/2015-summary.csv
Verify what’s in there:
$ head -n 5 /tmp/2015-summary.csv DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count United States,Romania,15 United States,Croatia,1 United States,Ireland,344 Egypt,United States,15
headreturns the first
nlines of the file specified (the CVS headers and then 4 lines of data)
Back to Scala:
Instruct our Scala Application about this file, specifying:
true: Scala will do its best guess to deduce the data’s schema
true: means that the first line of the csv is the header, so it can be used as columns names
scala> val flightData2015 = spark .read .option("inferSchema", "true") .option("header", "true") .csv("/tmp/2015-summary.csv")
Note that the
read method is a transformation and as such is lazy, so the
DataFrames here have an unknown length.
Scala just read a few lines at the top to infer schema, but that’s it.
We need to apply an action to get some results, such as returning the top 4 records:
scala> flightData2015.take(4) res1: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15])
These records effectively correspond to the ones we saw with the
Let’s sort these results by
count (last column) and explain what Spark will do (what’s the plan):
scala> flightData2015.sort("count").explain() == Physical Plan == *Sort [count#14 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(count#14 ASC NULLS FIRST, 200) +- *FileScan csv [DEST_COUNTRY_NAME#12,ORIGIN_COUNTRY_NAME#13,count#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/2015-summary.csv], PartitionFilters: , PushedFilters: , ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
This has to be read from bottom to top, top being the last executed operation and bottom the first one.
FileScan csv: Scans the file
Exchange rangepartitioninghas to be executed prior to
sortas it’s a wide transformation, so data partitions have to be reorganised
We can confirm by retrieving the first 4 as we did before:
scala> flightData2015.sort("count").take(4) res7: Array[org.apache.spark.sql.Row] = Array([Moldova,United States,1], [United States,Singapore,1], [United States,Croatia,1], [Malta,United States,1])
A more advanced example:
scala> :paste // Entering paste mode (ctrl-D to finish) flightData2015 .groupBy("DEST_COUNTRY_NAME") .sum("count") .withColumnRenamed("sum(count)", "destination_total") .sort(desc("destination_total")) .limit(5) .show() // Exiting paste mode, now interpreting. +-----------------+-----------------+ |DEST_COUNTRY_NAME|destination_total| +-----------------+-----------------+ | United States| 411352| | Canada| 8399| | Mexico| 7140| | United Kingdom| 2025| | Japan| 1548| +-----------------+-----------------+ scala>
:pastekeyword, allowing us to paste multiple-line statements. To use it, just type
:pasteand press enter, then follow instructions (ctrl+d to exit)