Chapter 5. Using the Spark DataFrame API
The Spark DataFrame API provides table-like access to data from a variety of sources. Its
    purpose is similar to Python's pandas library and R's data frames: collect and
    organize data into a tabular format with named columns. DataFrames can be constructed from a
    wide array of sources, including structured data files, Hive tables, and existing Spark
    RDDs.
- As user - spark, upload the- people.txtfile to HDFS:- cd /usr/hdp/current/spark-client su spark hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json 
- Launch the Spark shell: - cd /usr/hdp/current/spark-client su spark ./bin/spark-shell --num-executors 1 --executor-memory 512m --master yarn-client 
- At the Spark shell, type the following: - scala> val df = sqlContext.read.format("json").load("people.json")
- Using - df.show, display the contents of the DataFrame:- scala> df.show 16/05/31 11:24:10 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ 
Additional DataFrame API Examples
Here are additional examples of Scala-based DataFrame access, using DataFrame
        df defined in the previous subsection:
// Import the DataFrame functions API
scala> import org.apache.spark.sql.functions._
// Select all rows, but increment age by 1
scala> df.select(df("name"), df("age") + 1).show()
// Select people older than 21
scala> df.filter(df("age") > 21).show()
// Count people by age
df.groupBy("age").count().show()Specify Schema Programmatically
 The following example uses the DataFrame API to specify a schema for
        people.txt, and retrieve names from a temporary table associated with the
      schema:
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString = "name age"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)This will produce output similar to the following:
16/05/31 14:36:49 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool 16/05/31 14:36:49 INFO scheduler.DAGScheduler: ResultStage 13 (collect at :33) finished in 0.129 s 16/05/31 14:36:49 INFO scheduler.DAGScheduler: Job 10 finished: collect at :33, took 0.162827 s Name: Michael Name: Andy Name: Justin

