Thursday, June 18, 2015

Reading JSON data in Spark DataFrames

Overview

Spark DataFrames makes it easy to read from a variety of data formats, including JSON. In fact, it even automatically infers the JSON schema for you. Once the data is loaded, however, figuring out how to access individual fields is not so straightforward. This post will walk through reading top-level fields as well as JSON arrays and nested objects. The code provided is for Spark 1.4. Update: please see my updated post on an easier way to work with nested array of struct JSON data.

Load the JSON File

Let's begin by loading a JSON file, where each line is a JSON object:

{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}

The Scala code to read a JSON file:

>> val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

Read a Top-Level Field

With the above command, all of the data is read into a DataFrame. In the following examples, I will show how to extract individual fields into arrays of primitive types. Let's start with the top-level "name" field:

>> val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

>> names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)

Use the select() method to specify the top-level field, collect() to collect it into an Array[Row], and the getString() method to access a column inside each Row.

Flatten and Read a JSON Array

Update: please see my updated post on an easier way to work with nested array of struct JSON data.

Next, notice that each Person has an array of "cities". Let's flatten these arrays and read out all their elements.

>> val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame

>> val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]

>> allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)

The explode() method explodes, or flattens, the cities array into a new column named "city". We then use select() to select the new column, collect() to collect it into an Array[Row], and getString() to access the data inside each Row.

Read an Array of Nested JSON Objects, Unflattened

Finally, let's read out the "schools" data, which is an array of nested JSON objects. Each element of the array holds the school name and year:

>> val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]

>> val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]

>> schoolsArr.foreach(schools => {
>>    schools.map(row => print(row.getString(0), row.getLong(1)))
>>    print("\n")
>> })
(stanford,2010)(berkeley,2012) 
(ucsb,2011) 
(berkeley,2014)

Use select() and collect() to select the "schools" array and collect it into an Array[Row]. Now, each "schools" array is of type List[Row], so we read it out with the getSeq[Row]() method. Finally, we can read the information for each individual school, by calling getString() for the school name and getLong() for the school year. Phew!

Summary

In this blog post, we have walked through accessing top-level fields, arrays, and nested JSON objects from JSON data. The key classes involved were DataFrame, Array, Row, and List. We used the select(), collect(), and explode() DataFrame methods, and the getString(), getLong(), and get Seq[T]() Row methods to read data out into arrays of primitive types.

10 comments:

  1. Nice Article. Very useful. Thanks

    ReplyDelete
  2. How can one do this with the Java API?

    ReplyDelete
  3. Really thanks to share valuable Spark tips, i request, please create subscribe by email button to get instant posts info.

    ReplyDelete
    Replies
    1. I added the subscribe by email button. Thanks for reading.

      Delete
  4. Oddly, to make the `flattened` line work I had to change the List[String] from a List to a scala.collection.mutable.WrappedArray.

    Now to figure out how to do an explode on structs like the schools instead... then generalize so the it can infer the types from the schema.

    ReplyDelete
    Replies
    1. I think it may have changed to WrappedArray in Spark 1.5. My code was for 1.4.

      Delete