java 中Spark中将对象序列化存储到hdfs

网友投稿 330 2023-05-08


java 中Spark中将对象序列化存储到hdfs

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevel

import scala.FMgKccollection.JavaConverters._

import java.io.File

import java.io.FileInputStream

import java.io.FileOutputStream

import java.io.ObjectInputStream

import java.io.ObjectOutputStream

import java.net.URI

import java.util.Date

import org.ansj.library.UserDefineLibrary

import org.ansj.splitWord.analysis.NlpAnalysis

import org.ansj.splitWord.analysis.ToAnalysis

import orgFMgKc.apache.hadoop.fs.FSDataInputStream

import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.FileUtil

import org.apache.hadoop.fs.Path

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.filter.FilterList

import org.apache.hadoop.hbase.filter.PageFilter

import org.apache.hadoop.hbase.filter.RegexStringComparator

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp

import org.apache.hadoop.hbase.mapredFMgKcuce.TableInputFormat

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import com.feheadline.fespark.db.Neo4jManager

import com.feheadline.fespark.util.Env

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd._

import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

import scala.math.log

import scala.io.Source

object Word2VecDemo {

def convertScanToString(scan: Scan) = {

val proto = ProtobufUtil.toScan(scan)

Base64.encodeBytes(proto.toByteArray)

}

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("Word2Vec Demo")

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

sparkConf.set("spark.kryoserializer.buffer", "256m")

sparkConf.set("spark.kryoserializer.buffer.max","2046m")

sparkConf.set("spark.akka.frameSize", "500")

sparkConf.set("spark.rpc.askTimeout", "30")

val sc = new SparkContext(sparkConf)

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

val scan = new Scan()

val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)

val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")

val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(

"data".getBytes,

"article".getBytes,

CompareOp.EQUAL,

comp

)

filterList.addFilter(articleFilter)

filterList.addFilter(new PageFilter(100))

scan.setFilter(filterList)

scan.setCaching(50)

scan.setCacheBlocks(false)

hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

val crawledRDD = sc.newAPIHadoopRDD(

hbaseConf,

classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result]

)

val articlesRDD = crawledRDD.filter{

case (_,result) => {

val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))

content != null

}

}

val wordsInDoc = articlesRDD.map{

case (_,result) => {

val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))

if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq

else Seq("")

}

}

val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)

val word2vec = new Word2Vec()

val model = word2vec.fit(fitleredWordsInDoc)

//---------------------------------------重点看这里-------------------------------------------------------------

//将上面的模型存储到hdfs

val hadoopConf = sc.hadoopConfiguration

hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")

val fileSystem = FileSystem.get(hadoopConf)

val path = new Path("/user/hadoop/data/mllib/word2vec-object")

val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))

oos.writeObject(model)

oos.close

//这里示例另外一个程序直接从hdfs读取序列化对象使用模型

val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))

val sample_model = ois.readObject.asInstanceOf[Word2VecModel]

/*

* //你还可以将序列化文件从hdfs放到本地, scala程序使用模型

* import java.io._

* import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

* val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))

* val sample_model = ois.readObject.asInstanceOf[Word2VecModel]

* ois.close

*/

//--------------------------------------------------------------------------------------------------------------

}

}

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java IO文件过滤器对命令设计模式的使用
下一篇:Bootstrap输入框组件使用详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~