用户行为日志的统计,Java mapreduce与Scala spark的代码存档...

网友投稿 293 2022-11-06


用户行为日志的统计,Java mapreduce与Scala spark的代码存档...

123 456 123123 456Array[Array[(K,V)]]] = {Array[(K,V)] = { (123,1), (456,1), (123, 1) }Array[(K,V)] = { (123,1), (456,1) }}Array[(K,V)] = { (123,1), (456,1), (123, 1), (123,1), (456,1) }

注意spark在map的时候不要随便返回一个null,可能会导致程序运行失败,返回一个该类型的空对象就好。

Java mapreduce:

package com.news.rec.monitor;import com.newsRec.model.UserActionLog;import com.sohu.newsRec.parser.UserLogParser;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * Created by jacobzhou on 2016/9/1. */public class UserActiveCount extends Configured implements Tool { private static String DELIMA = "\t"; public static class MRMapper extends Mapper { private UserLogParser userActorParser = new UserLogParser(); private long getReadTime(String line){ String readTime = ""; int index = line.indexOf("readTime") + 9; while (line.charAt(index)>='0' && line.charAt(index)<='9'){ readTime += line.charAt(index); index ++; } if (!readTime.equals("")) return Long.parseLong(readTime); else return 0; } protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); UserActionLog userLog = userActorParser.parseKV(line); String act = userLog.getAct(); long gbCode = userLog.getgbCode(); long pvNum = 0; long expoNum = 0; long tmNum = 0; long readTime = getReadTime(line); if (readTime<4 || readTime>3000) readTime = 0; if (act.equals("expo")) expoNum = 1; else if (act.equals("pv")) pvNum = 1; else if (act.equals("tm")){ tmNum = 1; if (readTime == 0) return; } String net = userLog.getNet(); if (net==null || net.trim().equals("")){ net = "blank"; } String wKey = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode; String wValue = expoNum + DELIMA + pvNum + DELIMA + tmNum + DELIMA + readTime; context.write(new Text(wKey), new Text(wValue)); } protected void cleanup(Context context) throws IOException, InterruptedException {} } public static class MRReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String sKey[] = key.toString().split(DELIMA); long expoNum, pvNum, tmNum, readTime; String result; expoNum=pvNum=tmNum=readTime=0; for (Text val : values) { String data[] = val.toString().split(DELIMA); expoNum += Long.parseLong(data[0]); pvNum += Long.parseLong(data[1]); tmNum += Long.parseLong(data[2]); readTime += Long.parseLong(data[3]); } result = expoNum + DELIMA + pvNum + DELIMA + tmNum + DELIMA + readTime; context.write(key, new Text(result)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.job.queuename", "datacenter"); conf.set("mapred.max.map.failures.percent", "5"); int reduceTasksMax = 10; Job job = new Job(conf); job.setJobName("userActiveStatistic job"); job.setNumReduceTasks(reduceTasksMax); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MRMapper.class); job.setReducerClass(MRReducer.class); job.setJarByClass(UserActiveCount .class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job,new Path(args[0])); TextOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { try{ System.out.println("start run job!"); int ret = ToolRunner.run(new UserActiveCount(), args); System.exit(ret); }catch (Exception e){ e.printStackTrace(); } }}

Scala Spark, map, 把每行数据变成一个(key,value)

package zzyimport com.newsRec.parser.UserLogParserimport org.apache.spark.{SparkConf, SparkContext}/** * Created by jacobzhou on 2016/10/11. */object newsMonitor { private val DELIMA: String ="\t" private val userActorParser = new UserLogParser var num = 0 def mapData(line : String): (String,String) ={ if (num < 100) { println(line) num = num + 1 } val userLog = UserLogParser.parseKV(line) val act: String = userLog.getAct val gbCode: Long = userLog.getgbCode var pvNum: Long = 0 var expoNum: Long = 0 var tmNum: Long = 0 if (act == "expo") expoNum = 1 else if (act == "pv") pvNum = 1 else if (act == "tm") tmNum = 1 var net: String = userLog.getNet if (net == null || net.trim == "") net = "blank" val wKey: String = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode val wValue: String = expoNum + DELIMA + pvNum + DELIMA + tmNum (wKey , wValue) } def reduceData(a: String, b : String): String = { var expoNum: Long = 0L var pvNum: Long = 0L var tmNum: Long = 0L val dataA: Array[String] = a.split(DELIMA) val dataB: Array[String] = b.split(DELIMA) expoNum = dataA(0).toLong + dataB(0).toLong pvNum = dataA(1).toLong + dataB(1).toLong tmNum = dataA(2).toLong + dataB(2).toLong return expoNum + DELIMA + pvNum + DELIMA + tmNum } def main(args: Array[String]): Unit ={ println("Running") val conf = new SparkConf() conf.setAppName("SparkTest") val input = args(0) val output = args(1) val sc = new SparkContext(conf) val inData = sc.textFile(input) val tmp = inData.map(line => mapData(line)).reduceByKey((x,y) => reduceData(x,y));//.collect().foreach(println) tmp.saveAsTextFile(output); }}

Scala Spark, flatmap,有更好的扩展性,比如一行数据拆分成多个(key,value)就要先组合成一个List[(key,vlaue)]再通过flatmap展开

package zzyimport com.newsRec.parser.UserLogParserimport org.apache.spark.{SparkConf, SparkContext}/** * Created by jacobzhou on 2016/9/18. */object newsMonitor { private val DELIMA: String ="\t" private val userActorParser = new UserLogParser var num = 0 def mapData(line : String): Map[String,String] ={ if (num < 100) { println(line) num = num + 1 } val userLog = UserLogParser.parseKV(line) val act: String = userLog.getAct val gbCode: Long = userLog.getgbCode var pvNum: Long = 0 var expoNum: Long = 0 var tmNum: Long = 0 if (act == "expo") expoNum = 1 else if (act == "pv") pvNum = 1 else if (act == "tm") tmNum = 1 var net: String = userLog.getNet if (net == null || net.trim == "") net = "blank" val wKey: String = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode val wValue: String = expoNum + DELIMA + pvNum + DELIMA + tmNum return Map(wKey -> wValue); } def reduceData(a: String, b : String): String = { var expoNum: Long = 0L var pvNum: Long = 0L var tmNum: Long = 0L val dataA: Array[String] = a.split(DELIMA) val dataB: Array[String] = b.split(DELIMA) expoNum = dataA(0).toLong + dataB(0).toLong pvNum = dataA(1).toLong + dataB(1).toLong tmNum = dataA(2).toLong + dataB(2).toLong return expoNum + DELIMA + pvNum + DELIMA + tmNum } def main(args: Array[String]): Unit ={ println("Running") val conf = new SparkConf() conf.setAppName("SparkTest") val input = args(0) val output = args(1) val sc = new SparkContext(conf) val inData = sc.textFile(input) val tmp = inData.flatMap(line => mapData(line)).reduceByKey((x,y) => reduceData(x,y));//.collect().foreach(println) tmp.saveAsTextFile(output); }}

启动spark脚本举例

output=zeyangzhou/countinput=zeyangzhou/datahadoop fs -rmr $output jar=/opt/develop/zeyangzhou/zzy-1.0-SNAPSHOT-jar-with-dependencies.jarSPARK=/usr/lib/spark/bin/spark-submit${SPARK} --queue datacenter \ --class zzy.newsMonitor \ --executor-memory 15g \ --master yarn-cluster \ --driver-memory 20g \ --num-executors 30 \ --executor-cores 15 \ $jar $input $output


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:山东高速路况查询API(山东高速路况查询电话96199)
下一篇:第八届湖南省大学生程序设计大赛 - 笑不语@USC 随笔,感想,解题报告
相关文章

 发表评论

暂时没有评论,来抢沙发吧~