Spark Streaming Example

Spark Streaming Application

This example illustrates a web server log analytics use case to show how Spark Streaming can help with running analytics on data streams that are generated in a continuous manner.

These log messages are considered time series data, which is defined as a sequence of data points consisting of successive measurements captured over a specified time interval.

Time series data examples include sensor data, weather information, and click stream data. Time series analysis is about processing the time series data to extract insights that can be used for business decision making. This data can also be used for predictive analytics to predict future values based on historical data.

With a solution like this, we don’t need an hourly or daily batch job to process the server logs. Spark Streaming receives continuously generated data, processes it, and computes log statistics to provide insights into the data.

To follow a standard example on analyzing the server logs, we’ll use Apache Log Analyzer discussed in Data Bricks Spark Streaming Reference Application as a reference to our sample application. This application already has log message parsing code that we’ll reuse in our application. The reference application is an excellent resource to learn more about Spark framework in general and Spark Streaming in particular. For more details on Databricks Spark Reference Application, checkout their website.

Use Case

The use case for the sample application is a web server log analysis and statistics generator. In the sample application, we analyze the web server logs to compute the following statistics for further data analysis and create reports and dashboards:

  • Response counts by different HTTP response codes
  • Response content size
  • IP address of the clients to assess where the highest web traffic is coming from
  • Top end point URLs to identify which services are accessed more than others

Unlike the previous two articles in this series, we will use Java instead of Scala for creating the Spark program in this article. We’ll also run the program as a stand-alone application instead of running the code from the console window. This is how we would deploy Spark programs in Test and Production environments. Shell console interface (using Scala, Python, or R languages) is for local developer testing only.

Technologies

We will use the following technologies in sample application to demonstrate how Spark Streaming library is used for processing the real time data streams.

Zookeeper

Zookeeper is a centralized service providing reliable distributed coordination for distributed applications. Kafka, the messaging system we use in the sample application, depends on Zookeeper for configuration details across the cluster.

Apache Kafka

Apache Kafka is a real time, fault tolerant, scalable messaging system for moving data in real time. It’s a good candidate for use cases like capturing user activity on websites, logs, stock ticker data, and instrumentation data.

Kafka works like a distributed database and is based on a partitioned and replicated low latency commit log. When we post a message to Kafka, it’s replicated to different servers in the cluster and at the same time it’s also committed to disk.

Apache Kakfa includes client API as well as a data transfer framework called Kafka Connect.

Kafka Clients: Kafka includes Java clients (for both message producers and consumers). We will use the Java producer client API in our sample application.

Kafka Connect: Kafka also includes Kafka Connect, which is a framework for streaming data between Apache Kafka and external data systems to support the data pipelines in organizations. It includes import and export connectors to move data sets into and out of Kafka. Kafka Connect program can run as a standalone process or as a distributed service and supports REST interface to submit the connectors to Kafka Connect cluster using a REST API.

Spark Streaming

We’ll use Spark Streaming Java API to receive the data streams, calculate the log statistics, and run queries to answer questions like what are the IP addresses where more web requests are coming from, etc.

Table 1 below shows the technologies and tools and their versions used in the sample applications.

Technology

Version

URL

Zookeeper 3.4.6 https://zookeeper.apache.org/doc/r3.4.6/
Kafka 2.10 http://kafka.apache.org/downloads.html
Spark Streaming 1.4.1 https://spark.apache.org/releases/spark-release-1-4-1.html
JDK 1.7 http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html
Maven 3.3.3 http://archive.apache.org/dist/maven/maven-3/3.3.3/

Table 1. Spark streaming sample application technologies and tools

Different architecture components of Spark Streaming sample application are illustrated in Figure 3.

(Click on the image to enlarge it)

Figure 3. Spark Streaming Sample Application Architecture

Spark Streaming Application Run-time

To setup the Java project locally, you can download Databricks reference application code from Github. Once you get the reference application code, you will need two additional Java classes to run our sample application.

  • Log generator (SparkStreamingKafkaLogGenerator.java)
  • Log analyzer (SparkStreamingKafkaLogAnalyzer.java)

These files are provided as a zip file (spark-streaming-kafka-sample-app.zip) on the article website. If you want to run the sample application on your local machine, use the link to download the zip file, extract Java classes and add them to the Java project created in the previous step.

The sample application can be executed on different operating systems. I ran the application in both Windows and Linux (CentOS VM) environments.

Let’s look at each component in the application architecture and the steps to execute Sparking Streaming program.

Zookeeper Commands:

I used Zookeeper version 3.4.6 in the sample application. To start the server, set two environment variables, JAVA_HOME and ZOOKEEPER_HOME to point to JDK and Zookeeper installation directories respectively. Then navigate to Zookeeper home directory and run the following command to start Zookeeper server.

bin\zkServer.cmd

If you are using a Linux environment, the command is:

bin/zkServer.sh start

Kafka Server Commands:

Kafka version 2.10-0.9.0.0 was used in the program, which is based on Scala 2.10 version. Scala version you use with Kakfa is very important because if the correct version is not used, you get run-time errors when executing the spark streaming program. Here are the step to start Kafka server instance:

  • Open a new command prompt
  • Set JAVA_HOME and KAFKA_HOME variables
  • Navigate to Kafka home directory
  • Run the following command

bin\windows\kafka-server-start.bat config\server.properties

For Linux environment, the command is as follows:

bin/kafka-server-start.sh config/server.properties

Log Generator Commands:

Next step in our sample application is to run the message log generator.

Log generate creates test log messages with different HTTP response codes (like 200, 401, and 404) with different end point URLs.

Before we run the log generator, we need to create a Topic that we can write the messages to.

Similar to the previous step, open a new command prompt, set JAVA_HOME and KAFKA_HOMEvariables, and navigate to Kafka home directory. Then run the following command first to view the existing topics available in Kafka server.

bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --list

or in Linux:

bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --list

We will create a new topic called “spark-streaming-sample-topic” using the following command:

bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic

or in Linux:

bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic

You can run the list topics command again to see if the new topic has been created correctly.

After the topic has been created, we can run the log generator program. This is done by executing the Java class called SparkStreamingKafkaLogGenerator. Log generator class takes the following four arguments to specify the configuration parameters.

  • Group Id: spark-streaming-sample-group
  • Topic: spark-streaming-sample-topic
  • Number of iterations: 50
  • Interval: 1000

Open a new command prompt to run the log generator. We will set three environment variables (JAVA_HOME, MAVEN_HOME, and KAFKA_HOME) for JDK, Maven, and Kakfa directories respectively. Then navigate to sample project root directory (e.g. c:\dev\projects\spark-streaming-kafka-sample-app) and run the following command.

mvn exec:java -Dexec.mainClass=com.sparkstreaming.kafka.example.SparkStreamingKafkaLogGenerator -Dexec.args="spark-streaming-sample-groupid spark-streaming-sample-topic 50 1000"

Once the log generator program is running, you should see the test log messages created with the debug messages shown on the console. This is only sample code, so the log messages are randomly generated to simulate the continuous flow of data from an event store like a web server.

Figure 4 below shows the screenshot of log message producer and log messages are being generated.

(Click on the image to enlarge it)

Figure 4. Spark streaming log generator program output

Spark Streaming Commands:

This is the consumer of log messages using Spark Streaming API. We use a Java class called SparkStreamingKafkaLogAnalyzer to receive the data streams from Kafka server and process them to create log statistics.

Sparking Streaming processes server log messages and generates cumulative log statistics like web request content size (minimum, maximum, and average), response code counts, IP addresses and the top endpoints.

We create the Spark Context using “local[*]” parameter, which detects the number of cores in the local system and uses them to run the program.

To run the Spark Streaming Java class, you will need the following JAR files in the classpath:

  • kafka_2.10-0.9.0.0.jar
  • kafka-clients-0.9.0.0.jar
  • metrics-core-2.2.0.jar
  • spark-streaming-kafka_2.10-1.4.0.jar
  • zkclient-0.3.jar

I ran the program from Eclipse IDE after adding the above JAR files to the classpath. Log analysis Spark Streaming program output is shown in Figure 5.

(Click on the image to enlarge it)

Figure 5. Spark streaming log analytics program output

Visualization of Spark Streaming Applications

When Spark Streaming program is running, we can check the Spark console to view the details of the Spark jobs.

Open a new web browser window and navigate to URL http://localhost:4040 to access the Spark console.

Let’s look at some of the graphs showing the Spark Streaming program statistics.

First visualization is the DAG (Direct Acyclic Graph) of a specific job showing the dependency graph of different operations we ran in the program, like map, window, and foreachRDD. Figure 6 below shows the screenshot of this visualization of Spark Streaming job from our sample program.

(Click on the image to enlarge it)

Figure 6. DAG visualization graph of spark streaming job

Next graph we look at is the streaming statistics which include the input rate showing the number of events per second, processing time in milliseconds.

Figure 7 shows these statistics during the execution of Spark Streaming program when the streaming data is not being generated (left section) and when the data stream is being sent to Kafka and processed by Spark Streaming consumer (right section).

(Click on the image to enlarge it)

Figure 7. Spark visualization showing streaming statistics for the sample program

 

admin has written 55 articles