Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

网友投稿 665 2023-01-30


Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

一:准备数据源

在项目下新建一个student.txt文件,里面的内容为:

1,zhangsan,20

2,lisi,21

3,wanger,19

4,fangliu,18

二:实现

java版:

1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

package com.cxd.sql;

import java.io.Serializable;

@SuppressWarnings("serial")

public class Student implements Serializable {

String sid;

String sname;

int sage;

public String getSidhttp://() {

return sid;

}

public void setSid(String sid) {

this.sid = sid;

}

public String getSname() {

return sname;

}

public void setSname(String sname) {

this.sname = sname;

}

public int getSage() {

return sage;

}

public void setSage(int sage) {

this.sage = sage;

}

@Override

public String toString() {

return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";

}

}

2.转换,具体代码如下

package com.cxd.sql;

import java.util.ArrayList;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.SaveMode;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

public class TxtToParquetDemo {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");

SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

reflectTransform(spark);//Java反射

dynamicTransform(spark);//动态转换

}

/**

* 通过Java反射转换

* @param spark

*/

private static void reflectTransform(SparkSession spark)

{

JavaRDD source = spark.read().textFile("stuInfo.txt").javaRDD();

JavaRDD rowRDD = source.map(line -> {

String parts[] = line.split(",");

Student stu = new Student();

stu.setSid(parts[0]);

stu.setSname(parts[1]);

stu.setSage(Integer.valueOf(parts[2]));

return stu;

});

Dataset df = spark.createDataFrame(rowRDD, Student.class);

df.select("sid", "sname", "sage").

coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

}

/**

* 动态转换

* @param spark

*/

private static void dynamicTransform(SparkSession spark)

{

JavaRDD source = spark.read().textFile("stuInfo.txt").javaRDD();

JavaRDD rowRDD = source.map( line -> {

String[] parts = line.split(",");

String sid = parts[0];

String sname = parts[1];

int sage = Integer.parseInt(parts[2]);

return RowFactory.create(

sid,

sname,

sage

);

});

ArrayList fields = new ArrayList();

StructField field = null;

field = DataTypes.createStructField("sid", DataTypes.StringType, true);

fields.add(field);

field = DataTypes.createStructField("sname", DataTypes.StringType, true);

fields.add(field);

field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);

fields.add(field);

StructType schema = DataTypes.createStructType(fields);

Dataset df = spark.createDataFrame(rowRDD, schema);

df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");

}

}

scala版本:

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.Row

import org.apache.spark.sql.types.IntegerType

object RDD2Dataset {

case class Student(id:Int,name:String,age:Int)

def main(args:Array[String])

{

val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()

import spark.implicits._

reflectCreate(spark)

dynamicCreate(spark)

}

/**

* 通过Java反射转换

* @param spark

*/

private def reflectCreate(spark:SparkSession):Unit={

import spark.implicits._

val stuRDD=spark.spavhgUqrkContext.textFile("student2.txt")

//toDF()为隐式转换

val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()

//stuDf.select("id","name","age").write.text("result") //对写入文件指定列名

stuDf.printSchema()

stuDf.createOrReplaceTempView("student")

val nameDf=spark.sql("select name from student where age<20")

//nameDf.write.text("result") //将查询结果写入一个文件

nameDf.show()

}

/**

* 动态转换

* @param spark

*/

private def dynamicCreate(spark:SparkSession):Unit={

val stuRDD=spark.sparkContext.textFile("student.txt")

import spark.implicits._

val schemaString="id,name,age"

val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema=StructType(fields)

val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))

val stuDf=spark.createDataFrame(rowRDD, schema)

stuDf.printSchema()

val tmpView=stuDf.createOrReplaceTempView("student")

val nameDf=spark.sql("select name from student where age<20")

//nameDf.write.text("result") //将查询结果写入一个文件

nameDf.show()

}

}

注:

1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

2.此代码不适用于spark2.0以前的版本。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:平板电脑怎样连接共享文件夹(怎么连接上共享的文件夹)
下一篇:vue+axios新手实践实现登陆的示例代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~