Spark SQL Example

Spark SQL Application

Once you have Spark Shell launched, you can run the data analytics queries using Spark SQL API.

In the first example, we’ll load the customer data from a text file and create a DataFrame object from the dataset. Then we can run DataFrame functions as specific queries to select the data.

Let’s look at the contents of the text file called customers.txt shown below.

100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727

Following code snippet shows the Spark SQL commands you can run on the Spark Shell console.

// Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Import statement to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._

// Create a custom class to represent the Customer
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)

// Create a DataFrame of Customer objects from the dataset text file.
val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()

// Register DataFrame as a table.
dfCustomers.registerTempTable("customers")

// Display the content of DataFrame
dfCustomers.show()

// Print the DF schema
dfCustomers.printSchema()

// Select customer name column
dfCustomers.select("name").show()

// Select customer name and city columns
dfCustomers.select("name", "city").show()

// Select a customer by id
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()

// Count the customers by zip code
dfCustomers.groupBy("zip_code").count().show()

In the above example, the schema is inferred using the reflection. We can also programmatically specify the schema of the dataset. This is useful when the custom classes cannot be defined ahead of time because the structure of data is encoded in a string.

Following code example shows how to specify the schema using the new data type classes StructType, StringType, and StructField.

//
// Programmatically Specifying the Schema
//

// Create SQLContext from the existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val rddCustomers = sc.textFile("data/customers.txt")

// The schema is encoded in a string
val schemaString = "customer_id name city state zip_code"

// Import Spark SQL data types and Row.
import org.apache.spark.sql._

import org.apache.spark.sql.types._;

// Generate the schema based on the string of schema
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (rddCustomers) to Rows.
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// Apply the schema to the RDD.
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
dfCustomers.registerTempTable("customers")

// SQL statements can be run by using the sql methods provided by sqlContext.
val custNames = sqlContext.sql("SELECT name FROM customers")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
custNames.map(t => "Name: " + t(0)).collect().foreach(println)

// SQL statements can be run by using the sql methods provided by sqlContext.
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)

You can also load the data from other data sources like JSON data files, Hive tables, or even relational database tables using the JDBC data source.

admin has written 55 articles