import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
Comment out the following lines to install Spark locally in the same folder as this notebook:
# !curl -O https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
# !tar -xvf spark-2.2.0-bin-hadoop2.7.tgz
The following Python Library will configure your python environment
# !pip install findspark
If you would like to try using Spark on a cluster for free without any setup checkout Databricks Community Edition
Setup the PySpark environment.
import os
import findspark
os.environ["PYSPARK_PYTHON"] = "python3"
findspark.init("spark-2.2.0-bin-hadoop2.7",)
Import a few common components of Spark SQL
from pyspark.sql import SparkSession, Column, Row, functions as F
Initialize the SparkSQL session which contains a basic Spark Context. This may take a few moments to launch the cluster of (typically 4 to 8 python jobs in the background). Note in a real Spark deployment you would simply change the .master("local[*]")
to instead point to the YARN resource manager. To learn more about deploying Spark on a cluster of machines read this tutorial.
spark = (
SparkSession.builder
.master("local[*]")
.appName("LectureExample")
.getOrCreate()
)
sc = spark.sparkContext
Spark exposes two interfaces to data:
Today Spark users are encouraged to try to use the dataframe interface which provides additional system performance optimizations. However, in the homework and in this notebook we will use a bit of both to get some exposure the the low-level side of distributed data processing.
The sc
variable contains the SparkContext initialized above and it is used to created new RDDs. In this example we use python to create 32 numbers locally and then sc.parallelize
to distribute those 32 numbers to our cluster of machines. Each machine gets a subset of the numbers. If we had more than 32 machines then some will not be given a number.
numbers = sc.parallelize(range(32))
There are a wide range of basic operations on RDDs. Each of these operations runs across all the workers and results in either a value or another RDD.
numbers.sum()
numbers.count()
print(numbers.take(2))
print(numbers.collect())
numbers.filter(lambda x: x % 2 == 0)
numbers.filter(lambda x: x % 2 == 0).count()
( # What does this do?
numbers
.map(lambda x: x + 1)
.filter(lambda x: x % 2 == 0)
.map(lambda x: str(x) + " is even.")
.collect()
)
The above operations resulted in jobs being executed on a local Spark workers (separate python instances). We can learn more about the Spark configuration here. Click on the SparkUI link to view your local cluster.
spark
Are these really running on remote processes:
os.getpid()
import os
numbers.map(lambda x: os.getpid()).distinct().collect()
Let's use Spark to parallelize a simulation.
import numpy as np
import seaborn as sns
np.random.seed(42)
weird_data = np.random.randn(100000)**2 - 10 *np.abs( np.random.randn(100000) )
sns.distplot(weird_data)
def resample(data, seed):
import numpy as np
np.random.seed(seed)
return data[np.random.randint(0, len(data),len(data))]
def boot_strap_mean(data, seed):
return resample(data, seed).mean()
boot_strap_mean(weird_data, 43)
seeds = np.arange(10000) + 42
samples = (
sc.parallelize(seeds)
.map(lambda seed: boot_strap_mean(weird_data, seed))
)
samples.count()
samples.mean()
samples.mean()
sns.distplot(samples.collect())
One of the key innovations in Spark is the ability to cache computation.
%%timeit
samples.mean()
samples.persist()
This time when we run it the result will get saved
samples.mean()
If we run it again things will be much faster
samples.mean()
Timing after warming the cache:
%%timeit
samples.mean()
Orders of magnitude faster ... not surprising. How long did this take if I just use basic Python
%%timeit
samples_local = np.array([
boot_strap_mean(weird_data, seed)
for seed in seeds])
Here we will work through the standard process of preparing a text dataset. Note that while we will be loading from the local filesystem here. In a distributed computing environment the arguments to the text file command would be very similar (see below).
Uncomment the following lines to download some text data to process.
# !curl -O https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
# !unzip smsspamcollection.zip
The following line will load the text file into an RDD. If this file were running in a distributed filesystem the path would have been:
raw_lines = sc.textFile("hdfs://silly_data/SMSSPamCollection")
raw_lines = sc.textFile("./SMSSpamCollection")
We can get a look at a few lines:
raw_lines.take(3)
In the following RDD code we can transform the data file into records:
records = (
raw_lines
.map(lambda x: x.lower()) # Make it lower case
.map(lambda x: x.split("\t")) # Split records
.map(lambda x: Row(label=x[0], text=x[1])) # Make Row objects (from SparkSQL)
)
records.take(3)
How many Spam
records.filter(lambda x: x['label'] == 'spam').count()
How many Ham
records.filter(lambda x: x['label'] == 'ham').count()
If we wanted to determine if a post is spam we might first check to see if it has words that occur more often in spam messages. Before we proceed we need to introduce another (odd) Spark function.
sc.range(1,5).collect()
sc.range(1,5).map(lambda x: [i for i in range(x) ]).collect()
sc.range(1,5).flatMap(lambda x: [i for i in range(x) ]).collect()
Let's count how often word occur in general:
word_counts = (
records
.map(lambda x: x['text']) # extract the text
.flatMap(lambda x: x.split()) # split on whitespace and
# convert each word into a separate record.
.filter(lambda x: len(x) > 2) # keep words that have 3 or more letters
.map(lambda x: (x, 1)) # Count each word
.reduceByKey(lambda a, b: a + b) # Sum the counts
)
word_counts.map(lambda x: (x[1], x[0])).top(10)
word_counts_by_label = (
records
.flatMap(lambda x: ((x['label'], w) for w in x['text'].split()))
.filter(lambda x: len(x[1]) > 2) # keep words that have 3 or more letters
.map(lambda x: # Count each word
(x[1], np.array([1.0, 0.0]) if x[0] == 'spam' else np.array([0.0, 1.0]) ))
.reduceByKey(lambda a, b: a + b) # Sum the counts
)
word_counts_by_label.take(3)
Computing the words with highest count in Spam relative to Ham:
spam_ratio = (
word_counts_by_label.
map(lambda x: (x[0], (x[1][0] + 1.0) / (x[1][1] + 1.0)))
)
spam_ratio.take(2)
Taking the top spam ratio words we get:
spam_ratio.map(lambda x: (x[1], x[0])).top(10)
Today people are encourage to use the SparkSQL and Dataframe interfaces for basic data manipulation. These functions are heavily optimized and can often result in easier to read code.
df = records.toDF()
df
Instead of calling collect we could call toPandas
to construct the Pandas DataFrame from our "big data."
pandas_df = df.toPandas()
pandas_df.head()
We can apply filter operations using .where(...)
. Note here that the df['lable']
refers to the label column of the dataframe.
df.where(df['label'] == "spam")
We can use the select
command to select columns and even apply computation. The F.length
describes the length
function SparkSQL functions F
and the .alias(...)
command is like as
in SQL (renames the column).
(
df
.where(df['label'] == "spam")
.select(df['label'], F.length(df['text']).alias("textlen"))
).take(5)
Are Spam shorter on average?
(
df.groupBy(df['label']).agg(
F.avg(F.length(df['text'])).alias("avg_len"),
F.stddev(F.length(df['text'])).alias("stdev_len")
)
).toPandas()
You can also register dataframes as tables that can be addressed in SQL strings:
df.registerTempTable("spam")
spark.sql("""
SELECT label, avg(length(text))
FROM spam
GROUP BY label
""").toPandas()