博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop项目实战
阅读量:6848 次
发布时间:2019-06-26

本文共 13653 字,大约阅读时间需要 45 分钟。

这个项目是流量经营项目,通过Hadoop的离线数据项目。

运营商通过HTTP日志,分析用户的上网行为数据,进行行为轨迹的增强。

 

HTTP数据格式为:

流程:

系统架构:

技术选型:

这里只针对其中的一个功能进行说明:

其中规则库是人工填充的,实例库是采用机器学习自动生成的,形式都是<url,info>。

(一)统计流量排名前80%的URL,只有少数的URL流量比特别高,绝大多数的URL流量极低,没有参考价值,应当舍弃。

FlowBean.java:

package cn.itcast.hadoop.mr.flowsum;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable
{ private String phoneNB; private long up_flow; private long d_flow; private long s_flow; //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数 public FlowBean(){} //为了对象数据的初始化方便,加入一个带参的构造函数 public FlowBean(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; } public void set(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getD_flow() { return d_flow; } public void setD_flow(long d_flow) { this.d_flow = d_flow; } public long getS_flow() { return s_flow; } public void setS_flow(long s_flow) { this.s_flow = s_flow; } //将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(d_flow); out.writeLong(s_flow); } //从数据流中反序列出对象的数据 //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); d_flow = in.readLong(); s_flow = in.readLong(); } @Override public String toString() { return "" + up_flow + "\t" +d_flow + "\t" + s_flow; } @Override public int compareTo(FlowBean o) { return s_flow>o.getS_flow()?-1:1; } }

 

TopkURLMapper.java:

package cn.itcast.hadoop.mr.llyy.topkurl;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import cn.itcast.hadoop.mr.flowsum.FlowBean;public class TopkURLMapper extends Mapper
{ private FlowBean bean = new FlowBean(); private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { if (fields.length > 32 && StringUtils.isNotEmpty(fields[26]) && fields[26].startsWith("http")) { String url = fields[26]; long up_flow = Long.parseLong(fields[30]); long d_flow = Long.parseLong(fields[31]); k.set(url); bean.set("", up_flow, d_flow); context.write(k, bean); } } catch (Exception e) { System.out.println(); } }}

TopkURLReducer.java:

package cn.itcast.hadoop.mr.llyy.topkurl;import java.io.IOException;import java.util.Map.Entry;import java.util.Set;import java.util.TreeMap;import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import cn.itcast.hadoop.mr.flowsum.FlowBean;public class TopkURLReducer extends Reducer
{ private TreeMap
treeMap = new TreeMap<>(); private double globalCount = 0; //
@Override protected void reduce(Text key, Iterable
values,Context context) throws IOException, InterruptedException { Text url = new Text(key.toString()); long up_sum = 0; long d_sum = 0; for(FlowBean bean : values){ up_sum += bean.getUp_flow(); d_sum += bean.getD_flow(); } FlowBean bean = new FlowBean("", up_sum, d_sum); //每求得一条url的总流量,就累加到全局流量计数器中,等所有的记录处理完成后,globalCount中的值就是全局的流量总和 globalCount += bean.getS_flow(); treeMap.put(bean,url); } //cleanup方法是在reduer任务即将退出时被调用一次 @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set
> entrySet = treeMap.entrySet(); double tempCount = 0; for(Entry
ent: entrySet){ if(tempCount / globalCount < 0.8){ context.write(ent.getValue(), new LongWritable(ent.getKey().getS_flow())); tempCount += ent.getKey().getS_flow(); }else{ return; } } } }

TopkURLRunner.java:

package cn.itcast.hadoop.mr.llyy.topkurl;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import cn.itcast.hadoop.mr.flowsum.FlowBean;public class TopkURLRunner extends Configured implements Tool{    @Override    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);                job.setJarByClass(TopkURLRunner.class);                job.setMapperClass(TopkURLMapper.class);        job.setReducerClass(TopkURLReducer.class);                job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);                job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);                FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));                return job.waitForCompletion(true)?0:1;    }    public static void main(String[] args) throws Exception {        int res = ToolRunner.run(new Configuration(), new TopkURLRunner(), args);        System.exit(res);            }    }

(二)将统计的URL导入到数据库中,这是URL规则库,一共就两个字段,URL和info说明,info是人工来实现,贴上标签。

将上面的运行结果通过sqoop导入到数据库中,然后通过数据库读取再跑mapreduce程序。

DBLoader.java:数据库的工具类。

package cn.itcast.hadoop.mr.llyy.enhance;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;import java.util.HashMap;public class DBLoader {    public static void dbLoader(HashMap
ruleMap) { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://weekend01:3306/urlcontentanalyse", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,info from urlrule"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { DBLoader db = new DBLoader(); HashMap
map = new HashMap
(); db.dbLoader(map); System.out.println(map.size()); }}

LogEnhanceOutputFormat.java:默认是TextOutputFormat,这里我需要实现将不同的结果输到不同的文件中,而不是_SUCCESS中,所以我需要重写一个format。

package cn.itcast.hadoop.mr.llyy.enhance;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogEnhanceOutputFormat
extends FileOutputFormat
{ @Override public RecordWriter
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(new Configuration()); FSDataOutputStream enhancedOs = fs.create(new Path("/liuliang/output/enhancedLog")); FSDataOutputStream tocrawlOs = fs.create(new Path("/liuliang/output/tocrawl")); return new LogEnhanceRecordWriter
(enhancedOs,tocrawlOs); } public static class LogEnhanceRecordWriter
extends RecordWriter
{ private FSDataOutputStream enhancedOs =null; private FSDataOutputStream tocrawlOs =null; public LogEnhanceRecordWriter(FSDataOutputStream enhancedOs,FSDataOutputStream tocrawlOs){ this.enhancedOs = enhancedOs; this.tocrawlOs = tocrawlOs; } @Override public void write(K key, V value) throws IOException, InterruptedException { if(key.toString().contains("tocrawl")){ tocrawlOs.write(key.toString().getBytes()); }else{ enhancedOs.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(enhancedOs != null){ enhancedOs.close(); } if(tocrawlOs != null){ tocrawlOs.close(); } } } }

 

然后再从所有原始日志中抽取URL,查询规则库,如果由info标签,则追加在原始日志后面。否则,这个URL就是带爬取URL,后面追加tocrawl,这两种不同情况要输出到不同文件中。

LogEnhanceMapper.java:

package cn.itcast.hadoop.mr.llyy.enhance;import java.io.IOException;import java.util.HashMap;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** *  *  * 读入原始日志数据,抽取其中的url,查询规则库,获得该url指向的网页内容的分析结果,追加到原始日志后 *  * @author duanhaitao@itcast.cn *  */// 读入原始数据 (47个字段) 时间戳 ..... destip srcip ... url .. . get 200 ...// 抽取其中的url查询规则库得到众多的内容识别信息 网站类别,频道类别,主题词,关键词,影片名,主演,导演。。。。// 将分析结果追加到原始日志后面// context.write( 时间戳 ..... destip srcip ... url .. . get 200 ...// 网站类别,频道类别,主题词,关键词,影片名,主演,导演。。。。)// 如果某条url在规则库中查不到结果,则输出到带爬清单// context.write( url tocrawl)public class LogEnhanceMapper extends        Mapper
{ private HashMap
ruleMap = new HashMap<>(); // setup方法是在mapper task 初始化时被调用一次 @Override protected void setup(Context context) throws IOException, InterruptedException { DBLoader.dbLoader(ruleMap); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { if (fields.length > 27 && StringUtils.isNotEmpty(fields[26]) && fields[26].startsWith("http")) { String url = fields[26]; String info = ruleMap.get(url); String result = ""; if (info != null) { result = line + "\t" + info + "\n\r"; context.write(new Text(result), NullWritable.get()); } else { result = url + "\t" + "tocrawl" + "\n\r"; context.write(new Text(result), NullWritable.get()); } } else { return; } } catch (Exception e) { System.out.println("exception occured in mapper....."); } }}

LogEnhanceRunner.java:

package cn.itcast.hadoop.mr.llyy.enhance;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class LogEnhanceRunner extends Configured implements Tool{    @Override    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();                Job job = Job.getInstance(conf);                job.setJarByClass(LogEnhanceRunner.class);                job.setMapperClass(LogEnhanceMapper.class);                job.setOutputKeyClass(Text.class);        job.setOutputValueClass(NullWritable.class);                job.setOutputFormatClass(LogEnhanceOutputFormat.class);                FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));                return job.waitForCompletion(true)?0:1;    }    public static void main(String[] args) throws Exception {        int res = ToolRunner.run(new Configuration(), new LogEnhanceRunner(),args);        System.exit(res);    }        }

这里不写reduce也行。

 

MapReduce实现Top K问题:

转载地址:http://groul.baihongyu.com/

你可能感兴趣的文章
【ZZ】超全面的设计模式总结
查看>>
连续特征离散化和归一化
查看>>
CCF NOI1040 除法游戏
查看>>
如何使用Git上传项目代码到github
查看>>
HDU1312 ZOJ2165 Red and Black
查看>>
测试人员面试不自信怎么办?
查看>>
第一次实验报告
查看>>
正则匹配replace替换重复字符串
查看>>
[Linux学习]chattr配置文件隐藏属性及lsattr查看隐藏属性
查看>>
mybatis大数据提交和更新,数据SQL语句批量提交数据库
查看>>
每天一个小算法(Shell Sort2)
查看>>
Java数据结构与算法(7) - ch05双向链表(Double List)
查看>>
fiddler 教程
查看>>
自适应布局
查看>>
ILSpy c#反编译工具,附下载地址
查看>>
课堂练习——查找水王续
查看>>
在后台设置yii的配置文件
查看>>
(4/24) webpack3.x快速搭建本地服务和实现热更新
查看>>
Failure is not fatal, but failure to change might be.
查看>>
L2-015. 互评成绩
查看>>