take RDD,DataFrame,DataSet Between the conversion

RDD -》 DataFrame

*
Direct manual conversion
scala> val people =
spark.read.json("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> val
people1 =
sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt")
people1: org.apache.spark.rdd.RDD[String] =
/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt
MapPartitionsRDD[18] at textFile at <console>:24 scala> val peopleSplit =
people1.map{x => val strs = x.split(",");(strs(0),strs(1).trim.toInt)}
peopleSplit: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[19] at
map at <console>:26 scala> peopleSplit.collect res6: Array[(String, Int)] =
Array((Michael,29), (Andy,30), (Justin,19)) scala> peopleSplit.to toDF toDS
toDebugString toJavaRDD toLocalIterator toString top scala> peopleSplit.toDF
res7: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala>
peopleSplit.toDF("name","age") res8: org.apache.spark.sql.DataFrame = [name:
string, age: int] scala> res8.show +-------+---+ | name|age| +-------+---+
|Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
* adopt Scala Programming implementation ## establish schema scala> val schema =
StructType(StructField("name",StringType)::StructField("age",IntegerType)::Nil)
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(name,StringType,true),
StructField(age,IntegerType,true)) ## load RDD data scala> val rdd =
sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt")
rdd: org.apache.spark.rdd.RDD[String] =
/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt
MapPartitionsRDD[1] at textFile at <console>:30 ## establish Row object scala> val data =
rdd.map{x => val strs = x.split(",");Row(strs(0),strs(1).trim.toInt)} data:
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[2] at map
at <console>:32 ## generate DF scala> spark.createDataFrame(data,schema) 18/09/06
09:45:00 WARN ObjectStore: Version information not found in metastore.
hive.metastore.schema.verification is not enabled so recording the schema
version 1.2.0 18/09/06 09:45:00 WARN ObjectStore: Failed to get database
default, returning NoSuchObjectException 18/09/06 09:45:02 WARN ObjectStore:
Failed to get database global_temp, returning NoSuchObjectException res0:
org.apache.spark.sql.DataFrame = [name: string, age: int]
* reflex scala> case class People(name:String,age:Int) defined class People
scala> rdd.map{x => val
strs=x.split(",");People(strs(0),strs(1).trim.toInt)}.toDF res2:
org.apache.spark.sql.DataFrame = [name: string, age: int]
DataFrame -》 RDD
scala> res8.rdd res10: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
MapPartitionsRDD[26] at rdd at <console>:31
RDD -》 DataSet
scala> peopleSplit.toDS res11: org.apache.spark.sql.Dataset[(String, Int)] =
[_1: string, _2: int] scala> case class People(name:String,age:Int) defined
class People scala> val peopleDSSplit = people1.map{x => val strs =
x.split(","); People(strs(0),strs(1).trim.toInt)} peopleDSSplit:
org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[27] at map at <console>:28
scala> peopleDSSplit.toDS res12: org.apache.spark.sql.Dataset[People] = [name:
string, age: int] scala> res12.show +-------+---+ | name|age| +-------+---+
|Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
DataSet -》 RDD
scala> res12.rdd res14: org.apache.spark.rdd.RDD[People] =
MapPartitionsRDD[32] at rdd at <console>:33 scala> res14.map(_.name).collect
res15: Array[String] = Array(Michael, Andy, Justin)
DataSet -》 DataFrame
scala> res12.toDF res16: org.apache.spark.sql.DataFrame = [name: string, age:
int]
DataFrame -》 Datset
scala> res16.as[People] res17: org.apache.spark.sql.Dataset[People] = [name:
string, age: int]
 

 

 

Technology
©2020 ioDraw All rights reserved
2020 Nobel Prize in physiology or medicine announced Implementation and challenge of metadata service in data Lake Enterprises face SEM Bidding and SEO How to choose ? Or both ?spark.sql.shuffle.partitions and spark.default.parallelism The difference between JavaScript Do a simple guess number games What are the types of variables ? Trump's "VIP therapy ": Is receiving a drug treatment that has not yet been approved ( Essence )2020 year 6 month 26 day C# Class library DataTable( Extension method ) program ( process ) How is it stored in the operating system , Space allocation Understanding neural network machine translation in three minutes