<>1、从RDD创建DataFrame
 <>(1)利用元组创建
object _01_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark 
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1: 
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2: 
RDD[(String, Int, String)] = rdd1.map(line => { val arr = line.split(",") (arr(0
), arr(1).toInt, arr(2)) }) //3、rdd创建dataFrame //导入隐式转换 import spark.implicits._
//val dataFrame: DataFrame = rdd2.toDF() val dataFrame = rdd2.toDF("name","age",
"gender") //打印结构 dataFrame.printSchema() //展示数据集 dataFrame.show() } } 
 <>(2)利用样例类进行创建
object _02_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark 
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1: 
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2: 
RDD[User] = rdd1.map(line => { val arr = line.split(",") //封装数据到caseClass中 User(
arr(0), arr(1).toInt, arr(2)) }) //3、给rdd创建dataFrame //导入隐式转换 import spark.
implicits._ val dataFrame: DataFrame = rdd2.toDF() //打印结构 dataFrame.printSchema(
) //展示数据集 dataFrame.show() } } //case class中具有get方法 case class User(id:String,
age:Int,gender:String) 
 <>(3)利用spark.createDataFrame(rowRDD, schema)(常用)
object _03_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark 
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1: 
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) //封装数据到Row 
val rowRDD: RDD[Row] = rdd1.map(line => { val arr = line.split(",") Row(arr(0), 
arr(1).toInt, arr(2)) }) //3、给rowRD关联schema val schema = StructType( List( 
StructField("id", DataTypes.StringType), StructField("age", DataTypes.
IntegerType), StructField("gender", DataTypes.StringType) ) ) val dataFrame = 
spark.createDataFrame(rowRDD, schema) //打印结构 dataFrame.printSchema() //展示数据集 
dataFrame.show() } } 
 <>(4)利用scala的普通类进行创建
object _04_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark 
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1: 
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2: 
RDD[User_01] = rdd1.map(line => { val arr = line.split(",") new User_01(arr(0), 
arr(1).toInt, arr(2)) }) //3、给rdd创建dataFrame val dataFrame = spark.
createDataFrame(rdd2,classOf[User_01]) //打印结构 dataFrame.printSchema() //展示数据集 
dataFrame.show() } } //使用普通的类需要添加@BeanProperty注解 class User_01( @BeanProperty 
val id:String, @BeanProperty val age:Int, @BeanProperty val gender:String) 
 <>(5)利用java的类进行创建
object _05_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark 
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1: 
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2: 
RDD[User_03] = rdd1.map(line => { val arr = line.split(",") new User_03(arr(0),
arr(1).toInt,arr(2)) }) //3、给rdd创建dataFrame val dataFrame = spark.
createDataFrame(rdd2,classOf[User_03]) //打印结构 dataFrame.printSchema() //展示数据集 
dataFrame.show() } } public class User_03 { private String id; private int age; 
private String gender; public User_03(String id, int age, String gender) { this.
id= id; this.age = age; this.gender = gender; } public String getId() { return 
id; } public int getAge() { return age; } public String getGender() { return 
gender; } } 
 <>2、从结构化文件创建DataFrame
 <>(1)从JSON文件进行创建
json文件中,本身就带有schema信息(名称类型),在创建DataFrame之前,每一行json文件都需要进行读取,效率比较低;
 json文件中存储数据的格式比较丰富,但是要额外保存冗余的数据,因此占用的资源比较多。
object _09_ReadJson { def main(args: Array[String]): Unit = { val spark: 
SparkSession= SparkSession.builder().master("local[*]").getOrCreate() 
//从json文件中读取创建dataFrame val frame = spark.read.json("data/sparksql/person.json")
 frame.printSchema() frame.show() spark.stop() } } 
 <>(2)从csv文件进行创建
csv文件格式紧凑,占用资源小;
 但是需要指定表头,如果需要推断数据类型,在创建DataFrame之前需要额外触发Action,需要读文件中全部的数据,因此效率比较低。
object _08_Read_CSV { def main(args: Array[String]): Unit = { val spark = 
SparkSession.builder().master("local[*]").getOrCreate() val frame: DataFrame = 
spark.read .option("header","true")//第一行当作表头 .option("inferSchema",true) 
//读取所有,推断数据类型 .option("sep",",")//指定分隔符 .csv("data/sparksql/shop.txt") frame.
printSchema() spark.stop() } } 
 <>(3)从Parquet文件进行创建
Parquet文件中有头文件,描述着数据信息,创建DataFrame之前只需要读取头文件;
 数据存储紧凑,支持压缩;
 支持列式存储,查询更加高效,是更好的数据存储格式。
object _11_ReadParquet { def main(args: Array[String]): Unit = { val spark: 
SparkSession= SparkSession.builder().master("local[*]").getOrCreate() val frame 
= spark .read .parquet("data/sparksql/par") frame.show() spark.stop() } } 
除了上面的方式,还能够从外部服务器读取数据创建DataFrame,例如:从JDBC连接数据库服务器进行创建,从Hive仓库中加载创建DataFrame,从Hbase加载数据创建DataFrame,从Elastic 
Search加载数据创建DataFrame,下次继续分享。