Spark

Spark gives us a comprehensive, unified framework to manage big data processing requirements with a variety of data sets that are diverse in nature (text data, graph data etc) as well as the source of data (batch v. real-time streaming data). Spark enables applications in Hadoop clusters to run up to 100 times faster in memory and 10 times faster even when running on disk.

MapReduce is a great solution for one-pass computations, but not very efficient for use cases that require multi-pass computations and algorithms. The Job output data between each step has to be stored in the distributed file system before the next step can begin. Hence, this approach tends to be slow due to replication & disk storage. Also, Hadoop solutions typically include clusters that are hard to set up and manage. It also requires the integration of several tools for different big data use cases (like Mahout for Machine Learning and Storm for streaming data processing).

Spark allows programmers to develop complex, multi-step data pipelines using directed acyclic graph (DAG) pattern. It also supports in-memory data sharing across DAGs, so that different jobs can work with the same data.

Spark runs on top of existing Hadoop Distributed File System (HDFS) infrastructure to provide enhanced and additional functionality. It provides support for deploying Spark applications in an existing Hadoop v1 cluster (with SIMR – Spark-Inside-MapReduce) or Hadoop v2 YARN cluster or even Apache Mesos.

Features

  • Spark takes MapReduce to the next level with less expensive shuffles in the data processing. With capabilities like in-memory data storage and near real-time processing, the performance can be several times faster than other big data technologies.
  • Spark also supports lazy evaluation of big data queries, which helps with optimization of the steps in data processing workflows.
  • Spark holds intermediate results in memory rather than writing them to disk which is very useful especially when you need to work on the same dataset multiple times. It’s designed to be an execution engine that works both in-memory and on-disk. Spark operators perform external operations when data does not fit in memory. Spark can be used for processing datasets that larger than the aggregate memory in a cluster.
  • Spark will attempt to store as much as data in memory and then will spill to disk. It can store part of a data set in memory and the remaining data on the disk. You have to look at your data and use cases to assess the memory requirements. With this in-memory data storage, Spark comes with performance advantage.

Ecosystem

  • Spark Streaming:
    • Spark Streaming can be used for processing the real-time streaming data. This is based on micro batch style of computing and processing. It uses the DStream which is basically a series of RDDs, to process the real-time data.
  • Spark SQL:
    • Spark SQL provides the capability to expose the Spark datasets over JDBC API and allow running the SQL like queries on Spark data using traditional BI and visualization tools. Spark SQL allows the users to ETL their data from different formats it’s currently in (like JSON, Parquet, a Database), transform it, and expose it for ad-hoc querying.
  • Spark MLlib:
    • MLlib is Spark’s scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives.
  • Spark GraphX:
    • GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing the Resilient Distributed Property Graph: a directed multi-graph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

 

Architecture

  • Data Storage: Spark uses HDFS file system for data storage purposes. It works with any Hadoop compatible data source including HDFS, HBase, Cassandra, etc.
  • API: The API provides the application developers to create Spark based applications using a standard API interface. Spark provides API for Scala, Java, and Python programming languages. Following are the website links for the Spark API for each of these languages.
  • Resource Management: Spark can be deployed as a Stand-alone server or it can be on a distributed computing framework like Mesos or YARN.

Resilient Distributed Datasets

Resilient Distributed Dataset (based on Matei’s research paper) or RDD is the core concept in Spark framework. Think about RDD as a table in a database. It can hold any type of data. Spark stores data in RDD on different partitions. They are also fault tolerance because an RDD know how to recreate and recompute the datasets. RDDs are immutable. You can modify an RDD with a transformation but the transformation returns you a new RDD whereas the original RDD remains the same.

RDD supports two types of operations:

  • Transformation: Transformations don’t return a single value, they return a new RDD. Nothing gets evaluated when you call a Transformation function, it just takes an RDD and return a new RDD. Some of the Transformation functions are map, filter, flatMap, groupByKey, reduceByKey, aggregateByKey, pipe, and coalesce.
  • Action: Action operation evaluates and returns a new value. When an Action function is called on a RDD object, all the data processing queries are computed at that time and the result value is returned. Some of the Action operations are reduce, collect, count, first, take, countByKey, and foreach.

 

Getting Started

  1. Download and install Apache Spark from https://spark.apache.org/downloads.html . Select the version that matches your Hadoop Version (2.7.3) and Scala version (2.11).
  2. Decompress the Spark file into /DeZyre directory
    tar –xvf spark-2.0.1-bin-hadoop2.7.tgz –C /DeZyre

    Make a softlink to the actual spark directory (This will be helpful for any version upgrade in future)

    ln -s spark-2.0.1-bin-hadoop2.7 spark

    Make an entry for spark in .bashrc file

    SPARK_HOME=/dev/spark
    
    export PATH=$SPARK_HOME/bin:$PATH

    Source the changed .bashrc file by the command

    source  ~/.bashrc

    We have successfully configured spark in standalone mode. To check let’s launch the Spark Shell by the following command:

    spark-shell

    Now in the launched spark-shell, let’s check the Spark’s Scala shell version by the following command

    sc.version

For Windows follow these instructions:

  1. Follow this guide: http://www.ics.uci.edu/~shantas/Install_Spark_on_Windows10.pdf
  2. Download winutils.exe and save it to a directory of your choice, say c:\hadoop\bin.

    Set HADOOP_HOME to reflect the directory with winutils (without bin).

    set HADOOP_HOME=c:\hadoop

    Set PATH environment variable to include %HADOOP_HOME%\bin as follows:

    set PATH=%HADOOP_HOME%\bin;%PATH%
    Tip
    Define HADOOP_HOME and PATH environment variables in Control Panel.

    Create c:\tmp\hive folder and execute the following command:

    winutils.exe chmod -R 777 \tmp\hive

    Check the permissions:

    winutils.exe ls \tmp\hive

You use the commands spark-shell.cmd to run your Scala programs.

How to Run Spark

When you install Spark on the local machine or use a Cloud based installation, there are few different modes you can connect to Spark engine.

The following table shows the Master URL parameter for the different modes of running Spark.

Spark Web Console

When Spark is running in any mode, you can view the Spark job results and other statistics by accessing Spark Web Console via the following URL:

http://localhost:4040

Spark Console is shown in Figure 3 below with tabs for Stages, Storage, Environment, and Executors.

(Click on the image to enlarge it)

Shared Variables

Spark provides two types of shared variables to make it efficient to run the Spark programs in a cluster. These are Broadcast Variables and Accumulators.

Broadcast Variables: Broadcast variables allow to keep read-only variable cached on each machine instead of sending a copy of it with tasks. They can be used to give the nodes in the cluster copies of large input datasets more efficiently.

Following code snippet shows how to use the broadcast variables.

// Broadcast Variables

val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value

Accumulators: Accumulators are only added using an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Tasks running on the cluster can add to an accumulator variable using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value.

The code snippet below shows how to use Accumulator shared variable:

// Accumulators

val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value

 

Spark SQL

Spark SQL, part of Apache Spark big data framework, is used for structured data processing and allows running SQL like queries on Spark data. We can perform ETL on the data from different formats like JSON, Parquet, Database) and then run ad-hoc querying against the data stored in batch files, JSON data sets, or Hive tables. Spark SQL comes with the following features:

  • DataFrame: A DataFrame is a distributed collection of data organized into named columns. It is based on the data frame concept in R language and is similar to a database table in a relational database. SchemaRDD in prior versions of Spark SQL API, has been renamed to DataFrame. DataFrames can be converted to RDDs by calling the rdd method which returns the content of the DataFrame as an RDD of Rows. DataFrames can be created from different data sources such as:
    • Existing RDDs
    • Structured data files
    • JSON datasets
    • Hive tables
    • External databases

    Spark SQL and DataFrame API are available in the following programming languages:

    • Scala (https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.package
    • Java (https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/sql/api/java/package-summary.html)
  • Data Sources: With the addition of the data sources API, Spark SQL now makes it easier to compute over structured data stored in a wide variety of formats, including Parquet, JSON, and Apache Avro library.
  • SQLContext: Spark SQL provides SQLContext to encapsulate all relational functionality in Spark. You create the SQLContext from the existing SparkContext that we have seen in the previous examples. Following code snippet shows how to create a SQLContext object.val sqlContext = new org.apache.spark.sql.SQLContext(sc)There is also HiveContext which provides a superset of the functionality provided by SQLContext. It can be used to write queries using the HiveQL parser and read data from Hive tables. Note that you don’t need an existing Hive environment to use the HiveContext in Spark programs.
  • JDBC Server: The built-in JDBC server makes it easy to connect to the structured data stored in relational database tables and perform big data analytics using the traditional BI tools. JDBC data source can be used to read data from relational databases using JDBC API. This approach is preferred over using the JdbcRDD because the data source returns the results as a DataFrame which can be processed in Spark SQL or joined with other data sources.

 

Spark Streaming

Spark Streaming library which can be used for processing real-time data or streaming data which gives us the real-time insights. This is critical for use cases like fraud detection, online trading systems, event processing solutions etc.

Spark Streaming is an extension of core Spark API. Spark Streaming makes it easy to build fault-tolerant processing of real-time data streams.

The way Spark Streaming works is it divides the live stream of data into batches (called microbatches) of a pre-defined interval (N seconds) and then treats each batch of data as Resilient Distributed Datasets (RDDs). Then we can process these RDDs using the operations like map, reduce, reduceByKey, join and window. The results of these RDD operations are returned in batches. We usually store these results into a data store for further analytics and to generate reports and dashboards or sending event based alerts.

Streaming data can come from many different sources. Some of these data sources include the following:

 

Spark Streaming Use Cases

Spark Streaming is becoming the platform of choice to implement data processing and analytics solutions for real-time data received from Internet of Things (IoT) and sensors. It is used in a variety of use cases and business applications.

Some of the most interesting use cases of Spark Streaming include the following:

  • Uber, the company behind ride sharing service, uses Spark Streaming in their continuous Streaming ETL pipeline to collect terabytes of event data every day from their mobile users for real-time telemetry analytics.
  • Pinterest, the company behind the visual bookmarking tool, uses Spark Streaming, MemSQL and Apache Kafka technologies to provide insight into how their users are engaging with Pins across the globe in real-time.
  • Netflix uses Kafka and Spark Streaming to build a real-time online movie recommendation and data monitoring solution that processes billions of events received per day from different data sources.

Other real world examples of Spark Streaming include:

  • Supply chain analytics
  • Real-time security intelligence operations to find threats
  • Ad auction platform
  • Real-time video analytics to help with personalized, interactive experiences to the viewers

DStream

DStream (short for Discretized Stream) is the basic abstraction in Spark Streaming and represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying operations on other DStreams. Internally, a DStream is represented as a sequence of RDD objects.

Similar to the transformation and action operations on RDDs, DStreams support the following operations:

  • map
  • flatMap
  • filter
  • count
  • reduce
  • countByValue
  • reduceByKey
  • join
  • updateStateByKey

Streaming Context

Similar to SparkContext in Spark, StreamingContext is the main entry point for all streaming functionality.

StreamingContext has built-in methods for receiving streaming data into Spark Streaming program.

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname and port number. For example, if we are using a tool like netcat to test the Spark Streaming program, we would receive data stream from the machine where netcat is running (e.g. localhost) and port number of 9999.

When the code is executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing is done yet. To start the processing after all the transformations have been setup, we finally call start() method to start the computation and awaitTermination() method to wait for the computation to terminate.

Spark Streaming API

Spark Streaming comes with several API methods that are useful for processing data streams. There are RDD-like operations like map, flatMap, filter, count, reduce, groupByKey, reduceByKey, sortByKey, and join. It also provides additional API to process the streaming data based on window and stateful operations. These include window, countByWindow, reduceByWindow, countByValueAndWindow, reduceByKeyAndWindow and updateStateByKey.

Steps in a Spark Streaming program

Before we discuss the sample application, let’s take a look at different steps involved in a typical Spark Streaming program.

  • Spark Streaming Context is used for processing the real-time data streams. So, the first step is to initialize the StreamingContext object using two parameters, SparkContext and sliding interval time. Sliding interval sets the update window where we process the data coming in as streams. Once the context is initialized, no new computations can be defined or added to the existing context. Also, only one StreamingContext object can be active at the same time.
  • After Spark Streaming context is defined, we specify the input data sources by creating input DStreams. In our sample application, the input data source is a log message generator that uses Apache Kafka distributed database and messaging system. Log generator program creates random log messages to simulate a web server run-time environment where log messages are continuously generated as various web applications serve the user traffic.
  • Define the computations using the Sparking Streaming Transformations API like map and reduce to DStreams.
  • After streaming computation logic is defined, we can start receiving the data and process it using start method in StreamingContext object created earlier.
  • Finally, we wait for the streaming data processing to be stopped using the awaitTermination method of StreamingContext object.

 

Spark Machine Learning

Spark’s Machine Learning library, includes several different machine learning algorithms for Collaborative Filtering, Clustering, Classification and other machine learning tasks. Spark Machine Learning API includes two packages:

  • The spark.mllib package contains the original Spark machine learning API built on Resilient Distributed Datasets (RDDs). It offers machine learning techniques which include correlation, classification and regression, collaborative filtering, clustering, and dimensionality reduction.
  • The spark.ml package provides machine learning API built on the DataFrames which are becoming the core part of Spark SQL library. This package can be used for developing and managing the machine learning pipelines. It also provides Feature Extractors, Transformers, Selectors, and machine learning techniques like classification and regression, and clustering.

Machine Learning & Data Science

Machine Learning is about learning from existing data to make predictions about the future. It’s based on creating models from input data sets for data-driven decision making.

Data science is the discipline of extracting the knowledge from large data sets (structured or unstructured) to provide insights to business teams and influence the business strategies and roadmaps. There are different types of machine learning models like:

  • Supervised learning: This technique is used to predict an outcome by training the program using an existing set of training data (labeled data). Then we use the program to predict the label for a new unlabeled data set. There are two sub-models under supervised machine learning, Regression and Classification.
  • Unsupervised learning: This is used to find hidden patterns and correlations within the raw data. No training data used in this model, so this technique is based on unlabeled data. Algorithms like k-means and Principle Component Analysis (PCA) fall into this category.
  • Semi-supervised Learning: This technique uses both supervised and unsupervised learning models for predictive analytics. It uses labeled and unlabeled data sets for training. It typically involves using a small amount of labeled data with a large amount of unlabeled data. It can be used for machine learning methods like classification and regression.
  • Reinforcement learning: The Reinforcement Learning technique is used to learn how to maximize a numerical reward goal by trying different actions and discovering which actions result in the maximum reward.

Use Cases:

ML Model

Examples

Supervised learning Fraud detection
Unsupervised learning Social network applications, language prediction
Semi-supervised Learning Image categorization, Voice recognition
Reinforcement learning Artificial Intelligence (AI) applications

Machine Learning Algorithms

There are several algorithms to help with machine learning solutions:

  • Naive Bayes: Naive Bayes is a supervised learning algorithm used for classification. It’s based on applying Bayes theorem and a set of conditional independence assumptions.

    k-means Clustering: k-means algorithm creates k groups from a set of objects so that the members of a group are more similar.

  • Support vector machines: Support vector machines (SVMs) is a supervised learning algorithm used to find the boundary that separates classes by as wide a margin as possible. Given a set of training examples, each marked for belonging to one of two categories, an SVM training algorithm builds a model that assigns new examples into one category or the other. Applications of SVM include bioinformatics, text, and image recognition.
  • Decision Trees: Decision trees are used in many types of machine learning problems including multi-class classification. MLlib supports both basic decision tree algorithm and ensembles of trees. Two ensemble algorithms are available, Gradient-Boosted Trees and Random Forests.

As documented on this website, here is a summary of all the machine learning styles, problems, and methods to solve them

ML Model

Problems

Algorithms

Supervised Learning Classification, Regression, Anomaly Detection Logistic Regression, Back Propagation Neural Network
Unsupervised Learning Clustering, Dimensionality reduction k-Means , Apriori algorithm
Semi-Supervised Learning Classification, Regression Self training, Semi-supervised Support Vector Machines (S3VMs)

Steps in a Machine Learning Program

When working machine learning projects, other tasks like data preparation, cleansing and analysis are also very important tasks in addition to the actual learning models and algorithms used to solve the business problems.

Following are the steps performed in a typical machine learning program.

 

  • Featurization
  • Training
  • Model Evaluation

Use Cases

The business use cases for machine learning span different domains and scenarios including recommendation engines (food recommendation engine), predictive analytics (stock price prediction or predicting flights delay), targeted advertising, fraud detection, image and video recognition, self-driving cars and various other artificial intelligence.

  • Recommendation Engines

    Recommendation engines use the attributes of an item or a user or the behavior of a user or their peers, to make the predictions. There are different factors that drive an effective recommendation engine model. Some of these factors are list below:

    • Peer based
    • Customer behavior
    • Corporate deals or offers
    • Item clustering
    • Market/Store factors

    Recommendation engine solutions are implemented by leveraging two algorithms, content-based filtering and collaborative filtering.

    Content-based filtering: This is based on how similar a particular item is to other items based on usage and ratings. The model uses the content attributes of items (such as categories, tags, descriptions and other data) to generate a matrix of each item to other items and calculates similarity based on the ratings provided. Then the most similar items are listed together with a similarity score. Items with the highest score are most similar.

    Movie recommendation is a good example of this model. It recommends that “Users who liked a particular movie liked these other movies as well”.

    These models don’t take into account the overall behavior of other users, so they don’t provide personalized recommendations compared to other models like collaborative filtering.

    Collaborative Filtering: On the other hand, collaborative filtering model is based on making predictions to find a specific item or user based on similarity with other items or users. The filter applies weights based on the “peer user” preferences. The assumption is users who display similar profile or behavior have similar preferences for items.

    An example of this model is the recommendations on ecommerce websites like Amazon. When you search for an item on the website you would see something like “Customers Who Bought This Item Also Bought.”

    Items with the highest recommendation score are the most relevant to the user in context.

    Collaborative filtering based solutions perform better compared to other models. Spark MLlib implements a collaborative filtering algorithm called Alternating Least Squares (ALS). There are two variations of the entries in collaborative filtering, called explicit and implicit feedback. Explicit feedback is based on the direct preferences given by the user to the item (like a movie). Explicit feedback is nice, but many times it’s skewed because users who strongly like or dislike a product provide reviews on it. We don’t get the opinion of many people towards the center of the bell shaped curve of data points.

    Implicit feedback examples are user’s views, clicks, likes etc. Implicit feedback data is used a lot in the industry for predictive analytics because of the ease to gather this type of data.

    There are also model based methods for recommendation engines. These often incorporate methods from collaborative and content-based filtering. Model-based approach gets the best of both worlds, the power and performance of collaborative filtering and the flexibility and adaptability of content-based filtering. Deep learning techniques are good examples of this model.

    You can also integrate other algorithms like K-Means into the recommendation engine solution to get more refined predictions. K-Means algorithm works by partitioning “n” observations into “k” clusters in which each observation belongs to the cluster with the nearest mean. Using K-Means technique, we can find similar items or users based on their attributes.

    These are components of a recommendation engine, user factors, other factors like market data and different algorithms:

  • Fraud Detection

    Fraud detection is another important use case of using machine learning techniques because it addresses a critical problem in financial industry quickly and accurately. Financial services organizations only have few hundred milliseconds to determine if a particular online transaction is legitimate or a fraud.

    Neural network techniques are used for point-of-sale (POS) fraud detection use cases. Organizations like PayPal use different types of machine learning algorithms for risk management like linear, neural network, and deep learning.

    Spark MLlib library provides several algorithms for solving this use case, including linear SVMs, logistic regression, decision trees, and naive Bayes. In addition, ensemble models (which combine the predictions of a set of models) such as random forests or gradient-boosting trees are also available.

 

Spark MLlib

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It consists of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as lower-level optimization primitives and higher-level pipeline APIs.

Like we learned earlier, there are two different ways of using Spark Machine Learning API, Spark MLlib and Spark ML.

In addition to various algorithms Spark MLlib supports, there are also data processing functions and data analytics utilities and tools available in this library.

  • Frequent itemset mining via FP-growth and association rules
  • Sequential pattern mining via PrefixSpan
  • Summary statistics and hypothesis testing
  • Feature transformations
  • Model evaluation and hyper-parameter tuning

Figure 3 below shows Apache Spark framework with Spark MLlib library.

 

Summary

  • Application jar
    • User program and its dependencies are bundled into the application jar so that all of its dependencies are available to the application.
  • Driver program
    • It acts as the entry point of the application. This is the process which starts complete execution.
  • Cluster Manager
    • This is an external service which manages resources needed for the job execution.
    • It can be standalone spark manager, Apache Mesos, YARN, etc.
  • Deploy Mode
    • Cluster – Here driver runs inside the cluster
    • Client – Here driver is not part of the cluster. It is only responsible for job submission.
  • Worker Node
    • This is the node that runs the application program on the machine which contains the data.
  • Executor
    • Process launched on the worker node that runs tasks
    • It uses worker node’s resources and memory
  • Task
    • Fundamental unit of the Job that is run by the executor
  • Job
    • It is combination of multiple tasks
  • Stage
    • Each job is divided into smaller set of tasks called stages. Each stage is sequential and depend on each other.
  • SparkContext
    • It gets the application program access to the distributed cluster.
    • This acts as a handle to the resources of cluster.
    • We can pass custom configuration using the sparkcontext object.
    • It can be used to create RDD, accumulators and broadcast variable
  • RDD(Resilient Distributed Dataset)
    • RDD is the core of the spark’s API
    • It distributes the source data into multiple chunks over which we can perform operation simultaneously
    • Various transformation and actions can be applied over the RDD
    • RDD is created through SparkContext
  • Accumulator
    • This is used to carry shared variable across all partitions.
    • They can be used to implement counters (as in MapReduce) or sums
    • Accumulator’s value  can only be read by the driver program
    • It is set by the spark context
  • Broadcast Variable
    • Again a way of sharing variables across the partitions
    • It is a read only variable
    • Allows the programmer to distribute a read-only variable cached on each machine rather than shipping a copy of it with tasks thus avoiding wastage of disk space.
    • Any common data that is needed by each stage is distributed across all the nodes

Examples

References