接口测试的核心概念是什么
293
2022-11-06
用户行为日志的统计,Java mapreduce与Scala spark的代码存档...
123 456 123123 456Array[Array[(K,V)]]] = {Array[(K,V)] = { (123,1), (456,1), (123, 1) }Array[(K,V)] = { (123,1), (456,1) }}Array[(K,V)] = { (123,1), (456,1), (123, 1), (123,1), (456,1) }
注意spark在map的时候不要随便返回一个null,可能会导致程序运行失败,返回一个该类型的空对象就好。
Java mapreduce:
package com.news.rec.monitor;import com.newsRec.model.UserActionLog;import com.sohu.newsRec.parser.UserLogParser;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * Created by jacobzhou on 2016/9/1. */public class UserActiveCount extends Configured implements Tool { private static String DELIMA = "\t"; public static class MRMapper extends Mapper
Scala Spark, map, 把每行数据变成一个(key,value)
package zzyimport com.newsRec.parser.UserLogParserimport org.apache.spark.{SparkConf, SparkContext}/** * Created by jacobzhou on 2016/10/11. */object newsMonitor { private val DELIMA: String ="\t" private val userActorParser = new UserLogParser var num = 0 def mapData(line : String): (String,String) ={ if (num < 100) { println(line) num = num + 1 } val userLog = UserLogParser.parseKV(line) val act: String = userLog.getAct val gbCode: Long = userLog.getgbCode var pvNum: Long = 0 var expoNum: Long = 0 var tmNum: Long = 0 if (act == "expo") expoNum = 1 else if (act == "pv") pvNum = 1 else if (act == "tm") tmNum = 1 var net: String = userLog.getNet if (net == null || net.trim == "") net = "blank" val wKey: String = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode val wValue: String = expoNum + DELIMA + pvNum + DELIMA + tmNum (wKey , wValue) } def reduceData(a: String, b : String): String = { var expoNum: Long = 0L var pvNum: Long = 0L var tmNum: Long = 0L val dataA: Array[String] = a.split(DELIMA) val dataB: Array[String] = b.split(DELIMA) expoNum = dataA(0).toLong + dataB(0).toLong pvNum = dataA(1).toLong + dataB(1).toLong tmNum = dataA(2).toLong + dataB(2).toLong return expoNum + DELIMA + pvNum + DELIMA + tmNum } def main(args: Array[String]): Unit ={ println("Running") val conf = new SparkConf() conf.setAppName("SparkTest") val input = args(0) val output = args(1) val sc = new SparkContext(conf) val inData = sc.textFile(input) val tmp = inData.map(line => mapData(line)).reduceByKey((x,y) => reduceData(x,y));//.collect().foreach(println) tmp.saveAsTextFile(output); }}
Scala Spark, flatmap,有更好的扩展性,比如一行数据拆分成多个(key,value)就要先组合成一个List[(key,vlaue)]再通过flatmap展开
package zzyimport com.newsRec.parser.UserLogParserimport org.apache.spark.{SparkConf, SparkContext}/** * Created by jacobzhou on 2016/9/18. */object newsMonitor { private val DELIMA: String ="\t" private val userActorParser = new UserLogParser var num = 0 def mapData(line : String): Map[String,String] ={ if (num < 100) { println(line) num = num + 1 } val userLog = UserLogParser.parseKV(line) val act: String = userLog.getAct val gbCode: Long = userLog.getgbCode var pvNum: Long = 0 var expoNum: Long = 0 var tmNum: Long = 0 if (act == "expo") expoNum = 1 else if (act == "pv") pvNum = 1 else if (act == "tm") tmNum = 1 var net: String = userLog.getNet if (net == null || net.trim == "") net = "blank" val wKey: String = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode val wValue: String = expoNum + DELIMA + pvNum + DELIMA + tmNum return Map(wKey -> wValue); } def reduceData(a: String, b : String): String = { var expoNum: Long = 0L var pvNum: Long = 0L var tmNum: Long = 0L val dataA: Array[String] = a.split(DELIMA) val dataB: Array[String] = b.split(DELIMA) expoNum = dataA(0).toLong + dataB(0).toLong pvNum = dataA(1).toLong + dataB(1).toLong tmNum = dataA(2).toLong + dataB(2).toLong return expoNum + DELIMA + pvNum + DELIMA + tmNum } def main(args: Array[String]): Unit ={ println("Running") val conf = new SparkConf() conf.setAppName("SparkTest") val input = args(0) val output = args(1) val sc = new SparkContext(conf) val inData = sc.textFile(input) val tmp = inData.flatMap(line => mapData(line)).reduceByKey((x,y) => reduceData(x,y));//.collect().foreach(println) tmp.saveAsTextFile(output); }}
启动spark脚本举例
output=zeyangzhou/countinput=zeyangzhou/datahadoop fs -rmr $output jar=/opt/develop/zeyangzhou/zzy-1.0-SNAPSHOT-jar-with-dependencies.jarSPARK=/usr/lib/spark/bin/spark-submit${SPARK} --queue datacenter \ --class zzy.newsMonitor \ --executor-memory 15g \ --master yarn-cluster \ --driver-memory 20g \ --num-executors 30 \ --executor-cores 15 \ $jar $input $output
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~