`
阿尔萨斯
  • 浏览: 4171345 次
社区版块
存档分类
最新评论

MapReduce 编程 系列二 编写简单mapper

 
阅读更多

MapReduce的基本流程是, 框架会使用FileInputFormat读取文件,默认会根据文件大小的进行记录拆分,这里拆分器叫做InputSplitter。通过InputSplitter将文件拆成若干块,后面也就有若干个mapper与之对应。

InputSplitter里面使用RecordReader对文件块的记录进行读取,生成key/value的pair,调用mapper的map函数去处理。

当然这些流程中有些可以定制,比如InputSplitter的算法可以修改,RecordReader也是可以定制。

而且还有一个非常有效的方法,可以避免mapper将过多的数据传递给reducer。

比如前面的例子都是1, 其实可以先用一个HashMap对key做分组,有则value加1, 无则添加到HashMap中。

最后将分组统计后的key/value数据通过context.write方法发送给reducer,能够大大提高效率。


现在想从日志中提取数据,日志文件如下:

2014-05-10 13:36:40,140307000287,536dbacc4700aab274729cca,login
2014-05-10 13:37:46,140310000378,536dbae74700aab274729ccb,login
2014-05-10 13:39:20,140310000382,536dbb284700aab274729ccd,login
2014-05-10 13:39:31,140331001080,536dbb864700aab274729ccf,login
2014-05-10 13:39:45,140331001105,536dbba04700aab274729cd4,login
2014-05-10 13:39:45,140328000969,536dbba04700aab274729ce4,login
2014-05-10 13:39:45,140408001251,536dbba04700aab274729cd8,login
2014-05-10 13:39:45,140328000991,536dbba04700aab274729ce9,login
2014-05-10 13:39:45,140324000633,536dbba14700aab274729cf5,login
2014-05-10 13:39:45,140331001077,536dbba04700aab274729cdd,login
2014-05-10 13:39:45,140408001242,536dbba04700aab274729cd7,login
2014-05-10 13:39:45,140327000941,536dbba14700aab274729cf1,login
2014-05-10 13:39:45,140408001265,536dbba04700aab274729ce5,login
2014-05-10 13:39:45,140324000673,536dbba04700aab274729cd3,login
2014-05-10 13:39:45,140331001066,536dbba04700aab274729cd5,login
2014-05-10 13:39:45,140408001292,536dbba14700aab274729cee,login
2014-05-10 13:39:45,140328000966,536dbba14700aab274729cec,login
2014-05-10 13:39:45,140312000501,536dbba04700aab274729ce1,login
2014-05-10 13:39:45,140306000216,536dbba14700aab274729d02,login
2014-05-10 13:39:45,140327000856,536dbba04700aab274729ce2,login
2014-05-10 13:39:46,140328000985,536dbba14700aab274729cf7,login
2014-05-10 13:39:46,140306000245,536dbba14700aab274729d0d,login
2014-05-10 13:39:46,140326000797,536dbba14700aab274729cf6,login
2014-05-10 13:39:46,140328000993,536dbba14700aab274729d12,login
2014-05-10 13:39:46,140331001115,536dbba14700aab274729d10,login
2014-05-10 13:39:46,140325000744,536dbba04700aab274729ce0,login
2014-05-10 13:39:46,140328000982,536dbba14700aab274729d0a,login
2014-05-10 13:39:46,140331001063,536dbba04700aab274729ce3,login
2014-05-10 13:39:46,140331001067,536dbba14700aab274729d1c,login
2014-05-10 13:39:46,140401001157,536dbba04700aab274729ce8,login
2014-05-10 13:39:46,140408001216,536dbba14700aab274729cef,login
2014-05-10 13:39:46,140401001174,536dbba14700aab274729d27,login
2014-05-10 13:39:46,140306000215,536dbba04700aab274729cde,login
2014-05-10 13:39:46,140331001064,536dbba04700aab274729cdc,login
2014-05-10 13:39:46,140326000825,536dbba04700aab274729cd9,login
2014-05-10 13:39:46,140408001294,536dbba14700aab274729d0f,login


我希望将login前面的设备ID取出来,进行数量的统计,最后得到结果:

各个设备的累计登录次数

536dbba04700aab274729cdc 5
536dbba04700aab274729ce3 4

好,创建一个LogMapper类,该类负责做数据的Map,前两各模板参数用于KeyIn和ValueIn, 后两个模板参数用于KeyOut和ValueOut,都是代表类型。

假定一个<KeyIn, ValueIn>组成一个pair,输入的很多pair在一个组里面, 这些pair被一定的算法Map之后,会变成很多组pair。

官方文档:http://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/Mapper.html

Maps input key/value pairs to a set of intermediate key/value pairs.
注意,这里的Mapper类用的包是mapreduce,以前有一个老的叫mapred。

这里介绍了两者的区别:

http://stackoverflow.com/questions/7598422/is-it-better-to-use-the-mapred-or-the-mapreduce-package-to-create-a-hadoop-job


LongWritable和IntWritable是两个类,用于帮助创建可以Long和Int类型的变量。它们能够帮助将Long和Int的值序列化成字节流,因此都有两个关键方法读入和写出:

void readFields(DataInputin)
Deserialize the fields of this object fromin.
void write(DataOutputout)
Serialize the fields of this object toout.

这个和Hadoop内部RPC调用时采用的序列化算法有关。

我的Mapper代码:

package org.freebird.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper
    extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable ONE = new IntWritable(1);

    public void map(Object key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
	String[] line = value.toString().split(",");
	if (line.length == 4) {
	    String dId = line[2];
	    context.write(new Text(dId), ONE);
	}
    }    
}



这个Mapper的子类覆盖了map函数,将字符串用,号拆开后,取出第三个元素作为设备ID, 然后作为key写入context对象。

这里value设置为1, 因为后面reduce阶段会简单的求和。

Context类文档参考:http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/mapreduce/Mapper.Context.html

write方法不是一般概念的hasmap添加key,value,而是生成一个新的pair对象,里面包含了key和value。 如果多个key相同,也会产生多个pair对象,交给reduce阶段处理。






分享到:
评论

相关推荐

    MapReduce编程实例:单词计数

    本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...

    基于MapReduce的学生平均成绩统计

    利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。

    第5章 MapReduce分布式计算框架 2 5.1. MapReduce简介 2 5.2. wordcount经典案例介绍 2

    5.4. MapReduce编程规范 3 5.5. wordcount经典案例的实现 5 5.5.1. 分析数据准备 5 5.5.2. 新建maven项目,导入项目所需要的依赖 6 5.5.3. Mapper阶段代码编写 10 5.5.4. Reduce阶段代码编写 12 5.5.5. 定义Driver类...

    Hadoop-Streaming:Hadoop2.6 MapReduce2 Python3.5的一些经典入门程序:词频统计、好友推荐、PageRank

    MapReduce编程规范 1.用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端) 2.Mapper的输入数据是KV对的形式(KV的类型可自定义) 3.Mapper的输出数据是KV对的形式(K

    Hadoop实战中文版

    3.1 HDFS 文件操作 3.1.1 基本文件命令 3.1.2 编程读写HDFS 3.2 剖析MapReduce 程序 3.2.1 Hadoop数据类型 3.2.2 Mapper 3.2.3 Reducer 3.2.4 Partitioner:重定向Mapper输出 3.2.5 Combiner:本地reduce ...

    Hadoop实战(陆嘉恒)译

    Hadoop组件3.1 HDFS 文件操作3.1.1 基本文件命令3.1.2 编程读写HDFS3.2 剖析MapReduce 程序3.2.1 Hadoop数据类型3.2.2 Mapper3.2.3 Reducer3.2.4 Partitioner:重定向Mapper输出3.2.5 Combiner:本地reduce3.2.6 ...

    用python+hadoopstreaming编写分布式程序

    Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统GoogleFileSystem,并发布了相关论文(可在GoogleResearch的网站上获得:GFS、MapReduce)。DougCutting和MikeCafarella在开发搜索引擎Nutch时对这两...

    Hadoop实战

    Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5.1 动手扩展一个简单程序 ...

    Hadoop实战中文版.PDF

    413.2.5 Combiner:本地reduce 433.2.6 预定义mapper和Reducer类的单词计数 433.3 读和写 433.3.1 InputFormat 443.3.2 OutputFormat 493.4 小结 50第二部分 实战第4章 编写MapReduce基础程序 524.1...

    Hadoop streaming详细介绍

    Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作为我们...

    2017最新大数据架构师精英课程

    88_hadoop-mapreduce-切片演示-mapper 89_hadoop-mapreduce-url演示1 B% m, V- Z) ~. B9 |9 m2 u 90_job提交流程剖析 91_job split计算法则-读取切片的法则 92_job seqfile5 v! h+ R9 L1 w, U* T6 J# M 93_job 全...

Global site tag (gtag.js) - Google Analytics