Using the Spark DataFrame API
A DataFrame is a distributed collection of data organized into named columns. It is
      conceptually equivalent to a table in a relational database or a data frame in R or in the
      Python pandas library. You can construct DataFrames from a wide array of sources,
      including structured data files, Apache Hive tables, and existing Spark resilient distributed
      datasets (RDD). The Spark DataFrame API is available in Scala, Java, Python, and R.
This subsection contains several examples of DataFrame API use.
To list JSON file contents as a DataFrame:
- As user - spark, upload the- people.txtand- people.jsonsample files to the Hadoop Distributed File System (HDFS):- cd /usr/hdp/current/spark-client su spark hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json 
- Launch the Spark shell: - cd /usr/hdp/current/spark-client su spark ./bin/spark-shell --num-executors 1 --executor-memory 512m --master yarn-client 
- At the Spark shell, type the following: - scala> val df = sqlContext.read.format("json").load("people.json")
- Using - df.show, display the contents of the DataFrame:- scala> df.show 17/03/12 11:24:10 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ 
The following examples use Scala to access DataFrame df defined in the
      previous subsection:
// Import the DataFrame functions API
scala> import org.apache.spark.sql.functions._
// Select all rows, but increment age by 1
scala> df.select(df("name"), df("age") + 1).show()
// Select people older than 21
scala> df.filter(df("age") > 21).show()
// Count people by age
scala> df.groupBy("age").count().show()The following example uses the DataFrame API to specify a schema for
      people.txt, and then retrieves names from a temporary table associated with the
      schema:
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString = "name age"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)This produces output similar to the following:
17/03/12 14:36:49 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool 17/03/12 14:36:49 INFO scheduler.DAGScheduler: ResultStage 13 (collect at :33) finished in 0.129 s 17/03/12 14:36:49 INFO scheduler.DAGScheduler: Job 10 finished: collect at :33, took 0.162827 s Name: Michael Name: Andy Name: Justin

