博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
023_数量类型练习——Hadoop MapReduce手机流量统计
阅读量:5233 次
发布时间:2019-06-14

本文共 9264 字,大约阅读时间需要 30 分钟。

1) 分析业务需求:用户使用手机上网,存在流量的消耗。流量包括两部分:其一是上行流量(发送消息流量),其二是下行流量(接收消息的流量)。每种流量在网络传输过程中,有两种形式说明:包的大小,流量的大小。使用手机上网,以手机号为唯一标识符,进行记录。有记录,包括很多信息,需要的信息字段。

  实际需要的字段:

      手机号码、上行数据包数、下行数据包数、上行总流量、下行总流量。

2) 自定义数据类型(五个字段)

DataWritable implement WritableComparable接口。

1 package org.dragon.hadoop.mapreduce.app;  2   3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6   7 import org.apache.hadoop.io.Writable;  8   9 /** 10  *  11  * @author ZhuXY 12  * @time 2016-3-10 下午3:49:55 13  *  14  */ 15 public class DataWritable implements Writable { 16  17     // telsphone 18  19     // upload 20     private int upPackNum; 21     private int upPayLoad; 22  23     // download 24     private int downPackNum; 25     private int downPayLoad; 26  27     public DataWritable() { 28  29     } 30  31     public void set(int upPackNum, int upPayLoad, int downPackNum, 32             int downPayload) { 33         this.upPackNum = upPackNum; 34         this.upPayLoad = upPayLoad; 35         this.downPackNum = downPackNum; 36         this.downPayLoad = downPayload; 37  38     } 39  40     public int getUpPackNum() { 41         return upPackNum; 42     } 43  44     public int getUpPayLoas() { 45         return upPayLoad; 46     } 47  48     public int getDownPackNum() { 49         return downPackNum; 50     } 51  52     public int getDownPayload() { 53         return downPayLoad; 54     } 55  56     @Override 57     public void write(DataOutput out) throws IOException { 58         out.writeInt(upPackNum); 59         out.writeInt(upPayLoad); 60         out.writeInt(downPackNum); 61         out.writeInt(downPayLoad); 62     } 63  64     /** 65      * 讀出的順序要和寫入的順序相同 66      */ 67     @Override 68     public void readFields(DataInput in) throws IOException { 69         // TODO Auto-generated method stub 70         this.upPackNum = in.readInt(); 71         this.upPayLoad = in.readInt(); 72         this.downPackNum = in.readInt(); 73         this.downPayLoad = in.readInt(); 74     } 75  76     @Override 77     public String toString() { 78         return upPackNum + "\t" + upPayLoad + "\t" + downPackNum + "\t" 79                 + downPayLoad; 80     } 81  82     @Override 83     public int hashCode() { 84         final int prime = 31; 85         int result = 1; 86         result = prime * result + downPackNum; 87         result = prime * result + downPayLoad; 88         result = prime * result + upPackNum; 89         result = prime * result + upPayLoad; 90         return result; 91     } 92  93     @Override 94     public boolean equals(Object obj) { 95         if (this == obj) 96             return true; 97         if (obj == null) 98             return false; 99         if (getClass() != obj.getClass())100             return false;101         DataWritable other = (DataWritable) obj;102         if (downPackNum != other.downPackNum)103             return false;104         if (downPayLoad != other.downPayLoad)105             return false;106         if (upPackNum != other.upPackNum)107             return false;108         if (upPayLoad != other.upPayLoad)109             return false;110         return true;111     }112 113 }
DataWritable Code

3) 分析MapReduce写法,哪些业务逻辑在Map阶段执行,哪些业务逻辑在reduce阶段执行。

Map阶段:从文件中获取数据,抽取出需要的五个字段,输出的Key为手机号码,输出的Value为数据量的类型DataWritable对象。

Reduce阶段:将相同手机号码的Value中的数据流量进行相加,得出手机流量的总数(数据包和数据流量)。输出到文件中,以制表符分开。

1 package org.dragon.hadoop.mapreduce.app.topk;  2   3 import java.io.IOException;  4 import java.util.Iterator;  5 import java.util.TreeMap;  6 import java.util.TreeSet;  7   8 import javax.security.auth.callback.LanguageCallback;  9  10 import org.apache.hadoop.classification.InterfaceAudience.Private; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.LongWritable; 14 import org.apache.hadoop.io.NullWritable; 15 import org.apache.hadoop.io.Text; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.Mapper.Context; 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.dragon.hadoop.mapreduce.app.topk.TopKMapReduceV3.TopKReducer; 23  24 import com.sun.jersey.core.header.LanguageTag; 25  26 import sun.reflect.LangReflectAccess; 27  28 /** 29  *  30  * @author ZhuXY 31  * @time 2016-3-13 下午12:57:26 32  *  33  */ 34  35 /** 36  * 统计 & TopKey 37  *  38  * 数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称 需求: 统计前十首播放次数最多的歌曲名称和次数。 39  *  40  * 思想:在Mapper中输出:key---歌曲类型+歌曲名称 41  *                     value---播放次数 42  * Reducer中:key----封装成TopKWritable对象 43  *             value---nullwritable 44  * reduce方法中进行集合存储,然后删除多余的 45  *  46  */ 47 public class TopKMapReduceV4 { 48     private static final int KEY = 4; 49  50     // Mapper class 51     public static class TopKMapper extends 52             Mapper
{ 53 54 @Override 55 protected void cleanup(Context context) throws IOException, 56 InterruptedException { 57 super.cleanup(context); 58 } 59 60 @Override 61 protected void map(LongWritable key, Text value, Context context) 62 throws IOException, InterruptedException { 63 //文件的输入类型为TextInputFormat,默认到map中的为
64 String lineValue = value.toString(); 65 66 if (null == lineValue) { 67 return; 68 } 69 70 //split 71 String[] splitValue = lineValue.split("\t"); 72 73 if (splitValue != null && splitValue.length == 5) { 74 String languageType = splitValue[0]; 75 String songName = splitValue[1]; 76 Long playNum = Long.parseLong(splitValue[3]); 77 78 context.write(new Text(languageType + "\t" + songName), 79 new LongWritable(playNum)); 80 } 81 } 82 83 @Override 84 protected void setup(Context context) throws IOException, 85 InterruptedException { 86 // TODO Auto-generated method stub 87 super.setup(context); 88 } 89 } 90 91 // Reducer class 92 public static class TopKReducer extends 93 Reducer
{ 94 95 //此集合的排序规则即为TopKWritable中comparaTo的排序规则 96 TreeSet
treeSet=new TreeSet
(); 97 98 @Override 99 protected void setup(Context context)100 throws IOException, InterruptedException {101 // TODO Auto-generated method stub102 super.setup(context);103 }104 105 @Override106 protected void reduce(Text key, Iterable
values,107 Context context) throws IOException, InterruptedException {108 109 Long palyNum=(long) 0;110 if (key==null) {111 return;112 }113 114 //get key115 String[] keyArr=key.toString().split("\t");116 String languageType=keyArr[0];117 String songName=keyArr[1];118 119 //sum120 for(LongWritable value:values){121 palyNum+=value.get();122 }123 124 //歌曲类型、歌曲名称、歌曲播放次数封装成TopKWritable对象,保存在treeSet集合中,此集合自动排序125 treeSet.add(new TopKWritable(126 languageType,songName,palyNum127 ));128 129 if (treeSet.size()>KEY) {130 treeSet.remove(treeSet.last());//remove the current small longNum131 }132 }133 134 @Override135 protected void cleanup(Context context)136 throws IOException, InterruptedException {137 for (TopKWritable topKWritable : treeSet) {138 context.write(topKWritable,NullWritable.get());139 }140 }141 142 143 }144 // Driver Code145 public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {146 // get conf147 Configuration conf=new Configuration();148 149 // create job150 Job job =new Job(conf, TopKMapReduceV4.class.getSimpleName());//Job name151 152 // set job153 job.setJarByClass(TopKMapReduceV4.class);154 155 // 1)set inputPath156 FileInputFormat.addInputPath(job, new Path(args[0]));157 158 // 2)set map159 job.setMapperClass(TopKMapper.class);160 job.setMapOutputKeyClass(Text.class);161 job.setMapOutputValueClass(LongWritable.class);162 163 // 3)set outputPath164 FileOutputFormat.setOutputPath(job, new Path(args[1]));165 166 // 4)set reduce 167 job.setReducerClass(TopKReducer.class);168 job.setOutputKeyClass(TopKWritable.class);169 job.setOutputValueClass(NullWritable.class);170 171 // submit job172 boolean isSuccess=job.waitForCompletion(true);173 174 //return status175 return isSuccess?0:1;176 }177 178 public static void main(String[] args) throws IOException, InterruptedException, Exception {179 180 args=new String[]{181 "hdfs://hadoop-master.dragon.org:9000/wc/wcinput/",182 "hdfs://hadoop-master.dragon.org:9000/wc/wcoutput"183 };184 int status =new TopKMapReduceV4().run(args);185 System.exit(status);186 }187 }
View Mapper、Reducer、Driver Code

实际的业务中,原始数据存储在文件或者关系型数据库中,需要进行多次的数据的清理和筛选,符合我们需要的数据,将不合格的数据全部进行过滤,

Sqoop 框架,将关系型数据和Hbase、Hive以及HDFS中导入导出数据。

  针对实际的负责的实际业务,都需要自己编写代码进行数据的清洗。

 

转载于:https://www.cnblogs.com/xiangyangzhu/p/5278871.html

你可能感兴趣的文章
Mono 异步加载数据更新主线程
查看>>
初识lua
查看>>
我是插件狂人,jDuang,jValidator,jModal,jGallery
查看>>
张季跃 201771010139《面向对象程序设计(java)》第四周学习总结
查看>>
如何解除循环引用
查看>>
android中fragment的使用及与activity之间的通信
查看>>
jquery的contains方法
查看>>
python3--算法基础:二分查找/折半查找
查看>>
Perl IO:随机读写文件
查看>>
转:基于用户投票的排名算法系列
查看>>
WSDL 详解
查看>>
[转]ASP数组全集,多维数组和一维数组
查看>>
C# winform DataGridView 常见属性
查看>>
逻辑运算和while循环.
查看>>
Nhiberate (一)
查看>>
c#后台计算2个日期之间的天数差
查看>>
安卓开发中遇到的小问题
查看>>
ARTS打卡第3周
查看>>
linux后台运行和关闭SSH运行,查看后台任务
查看>>
cookies相关概念
查看>>