ITKeyword,专注技术干货聚合推荐

注册 | 登录

走向云计算之MapReduce应用案例详解

xiaokang123456kao 分享于 2017-06-14

推荐:云计算(二十四)- Apache Hadoop NextGen MapReduce (YARN)

前面学习了第一代的MR,今天开始学习一下YARN,MR部分的源码以及常用的几个demo(join,二次排序,自定义inputfomat等)会在以后的博客中介绍,相关的优化也会在

2019阿里云全部产品优惠券(新购或升级都可以使用,强烈推荐)
领取地址https://promotion.aliyun.com/ntms/yunparter/invite.html

一、概述

前面关于MapReduce的wordcount程序已经做了比较详细的分析,这里再给出MapReduce应用的几个小案例,来更加深入的理解MapReduce的设计理念和应用方法。部分内容参考了书籍《hadoop实战》中的内容。

二、MapReduce应用之数据去重

在统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重这个操作。

1、情境要求

假设有如下两个数据样本file1.txt和file2.txt
file1.txt

2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c

file2.txt

2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

我们需要将两个文件整合为一个文件并且去除其中重复的数据,最终结果应如下:

2012-3-1 a  
2012-3-1 b  
2012-3-2 a  
2012-3-2 b  
2012-3-3 b  
2012-3-3 c  
2012-3-4 d  
2012-3-5 a  
2012-3-6 b  
2012-3-6 c  
2012-3-7 c  
2012-3-7 d  

2、思路解析

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。那么如果我们将同一个数据的所有记录都交给同一台机器做reduce,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求。当reduce接收到一个

3、程序代码

package com.kang;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Duplicate {

    // map将输入中的value复制到输出数据的key上,并直接输出
    public static class Map extends Mapper<Object, Text, Text, Text> {
        private static Text line = new Text();// 每行数据
        // 实现map函数

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            line = value;
            context.write(line, new Text(""));
        }
    }

    // reduce将输入中的key复制到输出数据的key上,并直接输出
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(key, new Text(""));
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Duplicate");
        job.setJarByClass(Duplicate.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

map的输入格式是<偏移量,行数据>的形式,类似<0,2012-3-1 a>。map操作我们直接把输入的value值作为输出的key,而输出的value值为空。map的输出格式就是<行数据,null>,例如<2012-3-1 a ,null>。因为我们设置了combine操作,所以map把输出传给reduce的时候进行了“本地reduce”,其作用是把当前map的输出中的重复数据去掉。然后传给reduce端,reduce端的作用就是把来自不同map的数据中重复的数据去掉。

4、结果展示

这里写图片描述

三、MapReduce应用之排序

数据排序是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。

1、情境要求

输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。

  • 输入:

file1.txt

2
32
654
32
15
756
65223

file2.txt

5956
22
650
92

file3.txt

26
54
6

我们需要将三个文件整合为一个文件并且排序,最终结果应如下:

1   2
2   6
3   15
4   22
5   26
6   32
7   32
8   54
9   92
10  650
11  654
12  756
13  5956
14  65223

2、思路解析

对输入数据进行排序,可以利用MapReduce框架中的默认排序,而不需要自己再实现具体的排序。首先需要了解MapReduce默认排序规则。它是按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。
对本实例而言,在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。reduce拿到<key,value-list>之后,将输入的key作为value输出,并根据value-list中元素的个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。需要注意的是这个程序中没有配置Combiner,也就是在MapReduce过程中不使用Combiner。这主要是因为使用map和reduce就已经能够完成任务了。

3、程序代码

package com.kang;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Sort {

    // map将输入中的value化成IntWritable类型,作为输出的key

    public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
        private static IntWritable data = new IntWritable();

        // 实现map函数
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }

    }

    // reduce将输入中的key复制到输出数据的key上,
    // 然后根据输入的value-list中元素的个数决定key的输出次数
    // 用全局linenum来代表key的位次
    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        private static IntWritable linenum = new IntWritable(1);

        // 实现reduce函数
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(linenum, key);
                linenum = new IntWritable(linenum.get() + 1);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4、结果展示

这里写图片描述

四、MapReduce应用之计算平均数

计算平均数在数据分析中是经常遇到的,这里来分析如何利用MapReduce实现。

1、情境要求

这里利用MapReduce来计算学生的平均成绩。 输入文件中的每行内容均为一个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。要求在输出中每行有两个数据,其中,第一个代表学生的姓名,第二个代表其平均成绩。
math.txt

张三    88
李四    99
王五    66
赵六    77

china.txt

张三    80
李四    82
王五    84
赵六    86

english.txt

张三    78
李四    89
王五    96
赵六    67

输出:

张三  82
李四  90
王五  82
赵六  76

2、思路解析

计算学生平均成绩是一个仿”WordCount”例子,程序包括两部分的内容:Map部分和Reduce部分,分别实现了map和reduce的功能。
Map处理的是一个纯文本文件,文件中存放的数据时每一行表示一个学生的姓名和他相应一科成绩。Mapper处理的数据是由InputFormat分解过的数据集,其中InputFormat的作用是将数据集切割成小数据集InputSplit,每一个InputSlit将由一个Mapper负责处理。此外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成<key,value>对提供给了map函数。InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用LineRecordReader将InputSplit解析成<key,value>对,key是行在文本中的位置,value是文件中的一行。Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputFormat输出。Mapper最终处理的结果对<key,value>,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个Reducer上。Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有Reducer的上下文。Reduce的结果由Reducer.Context的write方法输出到文件中。

3、程序代码

package com.kang;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Average {

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        // 实现map函数
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将输入的纯文本文件的数据转化成String
            String line = value.toString();
            StringTokenizer tokenizerLine = new StringTokenizer(line);
            //StringTokenizer对象来对每一行数据进行切分,默认按照空格切分
            String strName = tokenizerLine.nextToken();// 学生姓名部分
            String strScore = tokenizerLine.nextToken();// 成绩部分
            Text name = new Text(strName);
            int scoreInt = Integer.parseInt(strScore);
            // 输出姓名和成绩
            context.write(name, new IntWritable(scoreInt));
        }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        // 实现reduce函数
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            Iterator<IntWritable> iterator = values.iterator();
            while (iterator.hasNext()) {
                sum += iterator.next().get();// 计算总分
                count++;// 统计总的科目数
            }
            int average = (int) sum / count;// 计算平均成绩
            context.write(key, new IntWritable(average));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "average");
        job.setJarByClass(Average.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一个RecordWriter的实现,负责数据输出
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4、结果展示

这里写图片描述

五、MapReduce应用之单表关联

“单表关联”要求从给出的数据中寻找所关心的数据,它是对原始数据所包含信息的挖掘。

1、情境要求

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。
file.txt

child      parent
Tom        Lucy
Tom        Jack
Jone       Lucy
Jone       Jack
Lucy       Mary
Lucy       Ben
Jack       Alice
Jack       Jesse
Terry      Alice
Terry      Jesse
Philip     Terry
Philip     Alma
Mark       Terry
Mark       Alma

其关系图谱如下:
这里写图片描述

输出应为如下:

grandchild  grandparent
Tom          Alice
Tom          Jesse
Jone         Alice
Jone         Jesse
Tom          Ben
Tom          Mary
Jone         Ben
Jone         Mary
Philip       Alice
Philip       Jesse
Mark         Alice
Mark         Jesse

2、思路解析

分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。
连接结果中除去连接的两列就是所需要的结果——”grandchild–grandparent”表。要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接;其次就是连接列的设置;最后是结果的整理。
考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来:
要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,并作为左表;再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了”grandchild–grandparent”关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

3、程序代码

package com.kang;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SingleTableJoin {

    public static int time = 0;

    /* * map将输出分割child和parent,然后正序输出一次作为右表, 反序输出一次作为左表,需要注意的是在输出的value中必须 * 加上左右表的区别标识。 */

    public static class Map extends Mapper<Object, Text, Text, Text> {

        // 实现map函数

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String childname = new String();// 孩子名称
            String parentname = new String();// 父母名称
            String relationtype = new String();// 左右表标识
            // 输入的一行预处理文本
            StringTokenizer itr = new StringTokenizer(value.toString());
            String[] values = new String[2];
            int i = 0;
            while (itr.hasMoreTokens()) {
                values[i] = itr.nextToken();
                i++;
            }
            if (values[0].compareTo("child") != 0) {
                childname = values[0];
                parentname = values[1];
                // 输出左表
                relationtype = "1";
                context.write(new Text(values[1]), new Text(relationtype + "+" + childname + "+" + parentname));
                // 输出右表
                relationtype = "2";
                context.write(new Text(values[0]), new Text(relationtype + "+" + childname + "+" + parentname));
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 输出表头
            if (0 == time) {
                context.write(new Text("grandchild"), new Text("grandparent"));
                time++;
            }
            int grandchildnum = 0;
            String[] grandchild = new String[10];
            int grandparentnum = 0;
            String[] grandparent = new String[10];
            Iterator ite = values.iterator();
            while (ite.hasNext()) {
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if (0 == len) {
                    continue;
                }
                // 取得左右表标识
                char relationtype = record.charAt(0);
                // 定义孩子和父母变量
                String childname = new String();
                String parentname = new String();
                // 获取value-list中value的child
                while (record.charAt(i) != '+') {
                    childname += record.charAt(i);
                    i++;
                }
                i = i + 1;
                // 获取value-list中value的parent
                while (i < len) {
                    parentname += record.charAt(i);
                    i++;
                }
                // 左表,取出child放入grandchildren
                if ('1' == relationtype) {
                    grandchild[grandchildnum] = childname;
                    grandchildnum++;
                }
                // 右表,取出parent放入grandparent
                if ('2' == relationtype) {
                    grandparent[grandparentnum] = parentname;
                    grandparentnum++;
                }
            }
            // grandchild和grandparent数组求笛卡尔儿积
            if (0 != grandchildnum && 0 != grandparentnum) {
                for (int m = 0; m < grandchildnum; m++) {
                    for (int n = 0; n < grandparentnum; n++) {
                        // 输出结果
                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "SingleTableJoin");
        job.setJarByClass(SingleTableJoin.class);
        // 设置Map和Reduce处理类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

下面来详细分析一下MapReduce的处理流程。
首先来看map的过程:

child      parent -->        忽略此行

Tom        Lucy   -->     <Lucy,1+Tom+Lucy> <Tom,2+Tom+Lucy >

Tom        Jack   -->     <Jack,1+Tom+Jack> <Tom,2+Tom+Jack>

Jone       Lucy   -->     <Lucy,1+Jone+Lucy> <Jone,2+Jone+Lucy>

Jone       Jack   -->     <Jack,1+Jone+Jack> <Jone,2+Jone+Jack>

Lucy       Mary   -->     <Mary,1+Lucy+Mary> <Lucy,2+Lucy+Mary>

Lucy       Ben    -->     <Ben,1+Lucy+Ben> <Lucy,2+Lucy+Ben>

Jack       Alice  -->     <Alice,1+Jack+Alice> <Jack,2+Jack+Alice>

Jack       Jesse  -->     <Jesse,1+Jack+Jesse> <Jack,2+Jack+Jesse>

Terry      Alice  -->     <Alice,1+Terry+Alice> <Terry,2+Terry+Alice>

Terry      Jesse  -->     <Jesse,1+Terry+Jesse> <Terry,2+Terry+Jesse>

Philip     Terry  -->     <Terry,1+Philip+Terry> <Philip,2+Philip+Terry>

Philip     Alma   -->     <Alma,1+Philip+Alma> <Philip,2+Philip+Alma>

Mark       Terry  -->     <Terry,1+Mark+Terry> <Mark,2+Mark+Terry>

Mark       Alma   -->     <Alma,1+Mark+Alma> <Mark,2+Mark+Alma>

map处理每行数据时产生两个输出,一个作为左表,一个作为右表,左右表通过标示1或者2来区分。

因为没有设置combine处理方法,所以map的输出首先进行排序,然后经过shuffle后直接传给reduce处理。这个过程如下:
这里写图片描述
这里写图片描述

注:上图结果有误,在shuffle一列中,

<Mary,1+Lucy+Mary,2+Mark+Terry,2+Mark+Alma>

这行数据有误,应改为:

<Mary,1+Lucy+Mary>
<Mark,2+Mark+Terry,2+Mark+Alma

shuffle的输出传到reduce端,reduce需要处理values这个内容即可。
首先需要输出表头信息,如下:

            // 输出表头
            if (0 == time) {
                context.write(new Text("grandchild"), new Text("grandparent"));
                time++;
            }

使用全局变量time是为了防止存在多个reduce Task来处理数据时时表头重复输出。

推荐:云计算(二十五)- Hadoop MapReduce Next Generation - Writing YARN Applications

概念和流程 客户端提交应用到Resource Manager,首先客户端需要使用ApplicationClientProtocol连接ResourceManager获取一个ApplicationId,通过ApplicationClien

reduce程序的关键步骤是:

            // grandchild和grandparent数组求笛卡尔儿积
            if (0 != grandchildnum && 0 != grandparentnum) {
                for (int m = 0; m < grandchildnum; m++) {
                    for (int n = 0; n < grandparentnum; n++) {
                        // 输出结果
                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                    }
                }
            }

首先由语句”0 != grandchildnum && 0 != grandparentnum”得知,只要在”value-list”中没有左表或者右表,则不会做处理,(因为这种情况下不会有祖父-孙子的关系)。可以根据这条规则去除无效的shuffle连接。经过这个处理,剩余的有效数据如下:

<Jack,1+Tom+Jack,1+Jone+Jack,2+Jack+Alice,2+Jack+Jesse >
<Lucy,1+Tom+Lucy,1+Jone+Lucy, 2+Lucy+Mary,2+Lucy+Ben>
<Terry,2+Terry+Alice,2+Terry+Jesse,1+Philip+Terry,1+Mark+Terry>

下面针对第一条数据进行分析:

<Jack,1+Tom+Jack,1+Jone+Jack,2+Jack+Alice, 2+Jack+Jesse >

分析结果:左表用”字符1”表示,右表用”字符2”表示,上面的

1+Tom+Jack,1+Jone+Jack表示Jack有两个孩子Tom和Jone
2+Jack+Alice,2+Jack+Jesse表示Jack的父母是Alice和Jesse

我们只需要分别取出左表的child和右表的parent,就可以得出祖父-孙子的关系了。

// 左表,取出child放入grandchildren
if ('1' == relationtype) {
    grandchild[grandchildnum] = childname;
    grandchildnum++;
}

// 右表,取出parent放入grandparent
if ('2' == relationtype) {
    grandparent[grandparentnum] = parentname;
    grandparentnum++;
}

最后输出即可:

        for (int m = 0; m < grandchildnum; m++) {
                    for (int n = 0; n < grandparentnum; n++) {
                        // 输出结果
                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                    }
                }

对于上面一条数据的输出就是:

Tom        Jesse
Tom        Alice
Jone       Jesse
Jone       Alice 

到最后reduce输出到HDFS上时,会对输出结果进行按照key值得排序,于是就得到了最终的结果。

4、结果展示

这里写图片描述

六、MapReduce应用之多表关联

多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。

1、情境要求

我们需要从两个信息表factory和address得出关联信息。
输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出”工厂名——地址名”表。
factory.txt

factoryname                    addressed
Beijing Red Star                        1
Shenzhen Thunder                    3
Guangzhou Honda                     2
Beijing Rising                       1
Guangzhou Development Bank              2
Tencent                         3
Back of Beijing                     1

address.txt

addressID    addressname
1           Beijing
2           Guangzhou
3           Shenzhen
4           Xian

我们需要将两个文件整合为一个文件找出相关关系,最终结果应如下:

factoryname addressname
Back of Beijing            Beijing 
Beijing Rising         Beijing 
Beijing Red Star       Beijing 
Guangzhou Development Bank     Guangzhou 
Guangzhou Honda            Guangzhou 
Tencent                Shenzhen 
Shenzhen Thunder           Shenzhen 

2、思路解析

多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。

3、程序代码

package com.kang;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MultipleTableJoin {
    public static int time = 0;
    /* * * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割, * * 保存连接列在key值,剩余列和左右表标志在value中,最后输出 * */
    public static class Map extends Mapper<Object, Text, Text, Text> {
        // 实现map函数
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();// 每行文件
            String relationtype = new String();// 左右表标识
            // 输入文件首行,不处理
            if (line.contains("factoryname") == true
                    || line.contains("addressID") == true) {
                return;
            }
            // 输入的一行预处理文本
            StringTokenizer itr = new StringTokenizer(line);
            String mapkey = new String();
            String mapvalue = new String();
            int i = 0;
            while (itr.hasMoreTokens()) {
                // 先读取一个单词
                String token = itr.nextToken();
                System.out.println(token);
                // 判断该地址ID就把存到"values[0]"
                if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
                    mapkey = token;
                    if (i > 0) {
                        relationtype = "1";
                    } else {
                        relationtype = "2";
                    }
                    continue;
                }
                // 存工厂名
                mapvalue += token + " ";
                i++;
            }
            // 输出左右表
            System.out.println(mapkey+"-----"+relationtype + "+" + mapvalue);
            context.write(new Text(mapkey), new Text(relationtype + "+" + mapvalue));
        }
    }
    /* * * reduce解析map输出,将value中数据按照左右表分别保存, * * 然后求出笛卡尔积,并输出。 * */
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 输出表头
            if (0 == time) {
                context.write(new Text("factoryname"), new Text("addressname"));
                time++;
            }
            int factorynum = 0;
            String[] factory = new String[10];
            int addressnum = 0;
            String[] address = new String[10];
            Iterator ite = values.iterator();
            while (ite.hasNext()) {
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if (0 == len) {
                    continue;
                }
                // 取得左右表标识
                char relationtype = record.charAt(0);
                // 左表
                if ('1' == relationtype) {
                    factory[factorynum] = record.substring(i);
                    factorynum++;
                }
                // 右表
                if ('2' == relationtype) {
                    address[addressnum] = record.substring(i);
                    addressnum++;
                }
            }
            // 求笛卡尔积
            if (0 != factorynum && 0 != addressnum) {
                for (int m = 0; m < factorynum; m++) {
                    for (int n = 0; n < addressnum; n++) {
                        // 输出结果
                        context.write(new Text(factory[m]),
                                new Text(address[n]));
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "MultipleTableJoin");
        job.setJarByClass(MultipleTableJoin.class);
        // 设置Map和Reduce处理类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4、结果展示

这里写图片描述

七、MapReduce应用之倒排索引

“倒排索引”是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。它主要是用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引。

1、情境要求

通常情况下,倒排索引由一个单词(或词组)以及相关的文档列表组成,文档列表中的文档或者是标识文档的ID号,或者是指文档所在位置的URL,如图
这里写图片描述
从上图可以看出,单词1出现在{文档1,文档4,文档13,……}中,单词2出现在{文档3,文档5,文档15,……}中,而单词3出现在{文档1,文档8,文档20,……}中。在实际应用中,还需要给每个文档添加一个权值,用来指出每个文档与搜索内容的相关度,如下图:
这里写图片描述
最常用的是使用词频作为权重,即记录单词在文档中出现的次数。以英文为例,如下图所示,索引文件中的”MapReduce”一行表示:”MapReduce”这个单词在文本T0中出现过1次,T1中出现过1次,T2中出现过2次。当搜索条件为”MapReduce”、”is”、”Simple”时,对应的集合为:{T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文档T0和T1包含了所要索引的单词,而且只有T0是连续的。
这里写图片描述
更复杂的权重还可能要记录单词在多少个文档中出现过,以实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考虑单词在文档中的位置信息(单词是否出现在标题中,反映了单词在文档中的重要性)等。
样例输入如下所示。
file1.txt:

MapReduce is simple

file2.txt:

MapReduce is powerful is simple

file3.txt:

Hello MapReduce bye MapReduce

样例输出如下所示。

Hello   file3.txt:1;
MapReduce   file3.txt:2;file1.txt:1;file2.txt:1;
bye file3.txt:1;
is  file1.txt:1;file2.txt:2;
powerful    file2.txt:1;
simple  file2.txt:1;file1.txt:1;

2、思路解析

实现”倒排索引”只要关注的信息为:单词、文档URL及词频,如前图所示。但是在实现过程中,索引文件的格式与图中会略有所不同,以避免重写OutPutFormat类。

  • Map过程

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、文档URL和词频,如图
这里写图片描述
这里存在两个问题:第一,<key,value>对只能有两个值,在不使用Hadoop自定义数据类型的情况下,需要根据情况将<单词、文档URL、词频>的其中两个值合并成一个值,作为key或value值;第二,通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计。
这里将单词和URL组成key值(如”MapReduce:file1.txt”),将词频作为value,这样做的好处是可以利用MapReduce框架自带的Map端排序,将同一文档的相同单词的词频组成列表,传递给Combine过程,实现类似于WordCount的功能。

  • Combine过程

经过map方法处理后,同一文档的相同单词的词频已经组成列表,Combine过程将key值相同的value值累加,得到一个单词在文档在文档中的词频,如下图所示。如果直接将图示的输出作为Reduce过程的输入,在Shuffle过程时将面临一个问题:所有具有相同单词的记录(由单词、URL和词频组成)应该交由同一个Reducer处理,但当前的key值无法保证这一点,所以必须修改key值和value值。这次将单词作为key值,URL和词频组成value值(如”file1.txt:1”)。这样做的好处是可以利用MapReduce框架默认的HashPartitioner类完成Shuffle过程,将相同单词的所有记录发送给同一个Reducer进行处理。
这里写图片描述

  • Reduce过程

经过上述两个过程后,Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给MapReduce框架进行处理了。如下图所示。
这里写图片描述

  • 可能出现的问题

本实例设计的倒排索引在文件数目上没有限制,但是单词文件不宜过大(具体值与默认HDFS块大小及相关配置有关),要保证每个文件对应一个split。否则,由于Reduce过程没有进一步统计词频,最终结果可能会出现词频未统计完全的单词。可以通过重写InputFormat类将每个文件分为一个split,避免上述情况。或者执行两次MapReduce,第一次MapReduce用于统计词频,第二次MapReduce用于生成倒排索引。除此之外,还可以利用复合键值对等实现包含更多信息的倒排索引。

3、程序代码

package com.kang;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndex {
    public static class Map extends Mapper<Object, Text, Text, Text> {
        private Text keyInfo = new Text(); // 存储单词和URL组合
        private Text valueInfo = new Text(); // 存储词频
        private FileSplit split; // 存储Split对象
        // 实现map函数
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // 获得<key,value>对所属的FileSplit对象
            split = (FileSplit) context.getInputSplit();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                // key值由单词和URL组成,如"MapReduce:file1.txt"
                // 获取文件的完整路径
                // keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
                // 这里只获取文件的名称。
                int splitIndex = split.getPath().toString().indexOf("file");
                keyInfo.set(itr.nextToken() + ":"
                    + split.getPath().toString().substring(splitIndex));
                // 词频初始化为1
                valueInfo.set("1");
                context.write(keyInfo, valueInfo);
            }
        }
    }
    public static class Combine extends Reducer<Text, Text, Text, Text> {
        private Text info = new Text();
        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 统计词频
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex = key.toString().indexOf(":");
            // 重新设置value值由URL和词频组成
            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
            // 重新设置key值为单词
            key.set(key.toString().substring(0, splitIndex));
            context.write(key, info);
        }
    }
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 生成文档列表
            String fileList = new String();
            for (Text value : values) {
                fileList += value.toString() + ";";
            }
            result.set(fileList);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "MultipleTableJoin");
        job.setJarByClass(InvertedIndex.class);
         // 设置Map、Combine和Reduce处理类
        job.setMapperClass(Map.class);
        job.setCombinerClass(Combine.class);
        job.setReducerClass(Reduce.class);
        // 设置Map输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 设置Reduce输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4、结果展示

这里写图片描述

八、总结

通过以上几个例子,较为详细的分析的MapReduce的几个应用实例,当我们通过java编程设置MapReduce任务时,需要指定以下几个要素:
1、设置job的基础属性

Job job = new Job();  
job.setJarByClass(***.class);  //main方法所在的类
job.setJobName(“job name”);  //任务名称
job.setNumReduce(2);  //设置Reduce数目(可选)

2、设置Map、Combine和Reduce处理类

 job.setMapperClass(Map.class);
 job.setCombinerClass(Combine.class);
 job.setReducerClass(Reduce.class);

3、设置Job的输入输出格式

void  setInputFormatClass(Class<? extends InputFormat> cls) void setOutputFormatClass(Class<? extends OutputFormat> cls) 

前者默认是TextInputFormat,后者是FileOutputFormat。
示例:

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

4、设置Job的输入输出路径
当输入输出是文件时,需要指定路径。
InputFormat:

static void    addInputPath(JobConf conf, Path path) 

FileOutputFormat:


static void    setOutputPath(Job job, Path outputDir)

例如:

FileInputFormat.addInputPath(job, new Path("hdfs://sparkproject1:9000/root/input/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://sparkproject1:9000/root/output/"));

5、设置map与reduce的输出键值类型
主要有以下4个类:

void    setOutputKeyClass(Class<?> theClass)  

void    setOutputValueClass(Class<?> theClass)  

void    setMapOutputKeyClass(Class<?> theClass)  

void    setMapOutputValueClass(Class<?> theClass)   
  • 前面2个方法设置整个job的输出,即reduce的输出。默认情况下,map的输出类型与reduce一致,若二者不一致,则需要通过后面2个方法来指定map的输出类型。
  • 关于输入类型的说明:reduce的输入类型由output的输出类型决定。map的输入类型由输入格式决定,如输入格式是FileInputFormat,则输入KV类型为LongWriterable与Text。

示例:

// 设置Map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reduce输出类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);

6、启动任务
通过调用如下语句实现

System.exit(job.waitForCompletion(true) ? 0 : 1);  

推荐:近200篇云计算、虚拟化、Hadoop、MapReduce、HDFS等云计算相关资料整理下载

近200篇云计算、虚拟化、Hadoop、MapReduce、HDFS等云计算相关资料整理下载   http://vivianskyer.iteye.com/blog/1604651

一、概述 前面关于MapReduce的wordcount程序已经做了比较详细的分析,这里再给出MapReduce应用的几个小案例,来更加深入的理解MapReduce的设计理念和应用方法。部分内容参考了书籍《hadoop实战

相关阅读排行


用户评论

游客

相关内容推荐

最新文章

×

×

请激活账号

为了能正常使用评论、编辑功能及以后陆续为用户提供的其他产品,请激活账号。

您的注册邮箱: 修改

重新发送激活邮件 进入我的邮箱

如果您没有收到激活邮件,请注意检查垃圾箱。